You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/22 10:10:01 UTC

[6/6] james-project git commit: JAMES-2647 Cassandra migration task for mapping sources

JAMES-2647 Cassandra migration task for mapping sources


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d51c7085
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d51c7085
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d51c7085

Branch: refs/heads/master
Commit: d51c7085443c947b3f77e64163d55800dc6e782b
Parents: 8938f5f
Author: Rene Cordier <rc...@linagora.com>
Authored: Mon Jan 21 15:02:55 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 22 17:09:38 2019 +0700

----------------------------------------------------------------------
 .../versions/CassandraSchemaVersionManager.java |   2 +-
 .../guice/protocols/webadmin-cassandra/pom.xml  |   4 +
 .../modules/server/CassandraRoutesModule.java   |   3 +
 server/data/data-cassandra/pom.xml              |   5 +
 .../cassandra/CassandraMappingsSourcesDAO.java  |   8 +-
 .../CassandraRecipientRewriteTable.java         |   9 +-
 .../CassandraRecipientRewriteTableDAO.java      |  45 ++----
 .../migration/MappingsSourcesMigration.java     |  65 ++++++++
 .../CassandraMappingsSourcesDAOTest.java        |   1 -
 .../CassandraRecipientRewriteTableDAOTest.java  |  10 +-
 .../migration/MappingsSourcesMigrationTest.java | 151 +++++++++++++++++++
 11 files changed, 256 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
index d953f7e..172fdb5 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
@@ -34,7 +34,7 @@ import com.google.common.base.Preconditions;
 
 public class CassandraSchemaVersionManager {
     public static final SchemaVersion MIN_VERSION = new SchemaVersion(2);
-    public static final SchemaVersion MAX_VERSION = new SchemaVersion(6);
+    public static final SchemaVersion MAX_VERSION = new SchemaVersion(7);
     public static final SchemaVersion DEFAULT_VERSION = MIN_VERSION;
 
     private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSchemaVersionManager.class);

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/container/guice/protocols/webadmin-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/server/container/guice/protocols/webadmin-cassandra/pom.xml b/server/container/guice/protocols/webadmin-cassandra/pom.xml
index 6554da8..4f7fbcb 100644
--- a/server/container/guice/protocols/webadmin-cassandra/pom.xml
+++ b/server/container/guice/protocols/webadmin-cassandra/pom.xml
@@ -41,6 +41,10 @@
             <artifactId>james-server-webadmin-cassandra</artifactId>
         </dependency>
         <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-data-cassandra</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.inject</groupId>
             <artifactId>guice</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
index 7513df4..01e4caa 100644
--- a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
+++ b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
@@ -26,6 +26,7 @@ import org.apache.james.backends.cassandra.versions.SchemaVersion;
 import org.apache.james.mailbox.cassandra.mail.migration.AttachmentMessageIdCreation;
 import org.apache.james.mailbox.cassandra.mail.migration.AttachmentV2Migration;
 import org.apache.james.mailbox.cassandra.mail.migration.MailboxPathV2Migration;
+import org.apache.james.rrt.cassandra.migration.MappingsSourcesMigration;
 import org.apache.james.webadmin.Routes;
 import org.apache.james.webadmin.routes.CassandraMailboxMergingRoutes;
 import org.apache.james.webadmin.routes.CassandraMigrationRoutes;
@@ -41,6 +42,7 @@ public class CassandraRoutesModule extends AbstractModule {
     private static final SchemaVersion FROM_V3_TO_V4 = new SchemaVersion(3);
     private static final SchemaVersion FROM_V4_TO_V5 = new SchemaVersion(4);
     private static final SchemaVersion FROM_V5_TO_V6 = new SchemaVersion(5);
+    private static final SchemaVersion FROM_V6_TO_V7 = new SchemaVersion(6);
 
     @Override
     protected void configure() {
@@ -57,6 +59,7 @@ public class CassandraRoutesModule extends AbstractModule {
         allMigrationClazzBinder.addBinding(FROM_V3_TO_V4).to(AttachmentV2Migration.class);
         allMigrationClazzBinder.addBinding(FROM_V4_TO_V5).to(AttachmentMessageIdCreation.class);
         allMigrationClazzBinder.addBinding(FROM_V5_TO_V6).to(MailboxPathV2Migration.class);
+        allMigrationClazzBinder.addBinding(FROM_V6_TO_V7).to(MappingsSourcesMigration.class);
 
         bind(SchemaVersion.class)
             .annotatedWith(Names.named(CassandraMigrationService.LATEST_VERSION))

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/pom.xml b/server/data/data-cassandra/pom.xml
index 01780b5..bd3220a 100644
--- a/server/data/data-cassandra/pom.xml
+++ b/server/data/data-cassandra/pom.xml
@@ -150,6 +150,11 @@
             <artifactId>testcontainers</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
index 777a50e..6ff3072 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
@@ -41,14 +41,14 @@ import com.datastax.driver.core.Session;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-class CassandraMappingsSourcesDAO {
+public class CassandraMappingsSourcesDAO {
     private final CassandraAsyncExecutor executor;
     private final PreparedStatement insertStatement;
     private final PreparedStatement deleteStatement;
     private final PreparedStatement retrieveSourcesStatement;
 
     @Inject
-    CassandraMappingsSourcesDAO(Session session) {
+    public CassandraMappingsSourcesDAO(Session session) {
         this.executor = new CassandraAsyncExecutor(session);
         this.insertStatement = prepareInsertStatement(session);
         this.deleteStatement = prepareDelete(session);
@@ -77,7 +77,7 @@ class CassandraMappingsSourcesDAO {
             .and(eq(MAPPING_VALUE, bindMarker(MAPPING_VALUE))));
     }
 
-    Mono<Void> addMapping(Mapping mapping, MappingSource source) {
+    public Mono<Void> addMapping(Mapping mapping, MappingSource source) {
         return executor.executeVoidReactor(insertStatement.bind()
             .setString(MAPPING_TYPE, mapping.getType().asPrefix())
             .setString(MAPPING_VALUE, mapping.getMappingValue())
@@ -91,7 +91,7 @@ class CassandraMappingsSourcesDAO {
             .setString(SOURCE, source.asMailAddressString()));
     }
 
-    Flux<MappingSource> retrieveSources(Mapping mapping) {
+    public Flux<MappingSource> retrieveSources(Mapping mapping) {
         return executor.executeReactor(retrieveSourcesStatement.bind()
             .setString(MAPPING_TYPE, mapping.getType().asPrefix())
             .setString(MAPPING_VALUE, mapping.getMappingValue()))

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
index 4c7a92b..0a51ec1 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
@@ -30,6 +30,8 @@ import org.apache.james.rrt.lib.Mappings;
 import org.apache.james.rrt.lib.MappingsImpl;
 import org.apache.james.util.OptionalUtils;
 
+import com.github.steveash.guavate.Guavate;
+
 public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTable {
     private final CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
     private final CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO;
@@ -63,7 +65,12 @@ public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTabl
 
     @Override
     public Map<MappingSource, Mappings> getAllMappings() {
-        return cassandraRecipientRewriteTableDAO.getAllMappings().block();
+        return cassandraRecipientRewriteTableDAO.getAllMappings()
+            .collect(Guavate.toImmutableMap(
+                pair -> pair.getLeft(),
+                pair -> MappingsImpl.fromMappings(pair.getRight()),
+                Mappings::union))
+            .block();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
index 52f3516..fe31f1f 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
@@ -29,24 +29,23 @@ import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTab
 import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.TABLE_NAME;
 import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.USER;
 
-import java.util.Map;
-
 import javax.inject.Inject;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.rrt.lib.Mapping;
 import org.apache.james.rrt.lib.MappingSource;
-import org.apache.james.rrt.lib.Mappings;
 import org.apache.james.rrt.lib.MappingsImpl;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
 import com.github.steveash.guavate.Guavate;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-class CassandraRecipientRewriteTableDAO {
+public class CassandraRecipientRewriteTableDAO {
     private final CassandraAsyncExecutor executor;
     private final CassandraUtils cassandraUtils;
     private final PreparedStatement insertStatement;
@@ -55,7 +54,7 @@ class CassandraRecipientRewriteTableDAO {
     private final PreparedStatement retrieveAllMappingsStatement;
 
     @Inject
-    CassandraRecipientRewriteTableDAO(Session session, CassandraUtils cassandraUtils) {
+    public CassandraRecipientRewriteTableDAO(Session session, CassandraUtils cassandraUtils) {
         this.executor = new CassandraAsyncExecutor(session);
         this.cassandraUtils = cassandraUtils;
         this.insertStatement = prepareInsertStatement(session);
@@ -91,7 +90,7 @@ class CassandraRecipientRewriteTableDAO {
             .value(MAPPING, bindMarker(MAPPING)));
     }
 
-    Mono<Void> addMapping(MappingSource source, Mapping mapping) {
+    public Mono<Void> addMapping(MappingSource source, Mapping mapping) {
         return executor.executeVoidReactor(insertStatement.bind()
             .setString(USER, source.getFixedUser())
             .setString(DOMAIN, source.getFixedDomain())
@@ -116,35 +115,11 @@ class CassandraRecipientRewriteTableDAO {
             .filter(mappings -> !mappings.isEmpty());
     }
 
-    Mono<Map<MappingSource, Mappings>> getAllMappings() {
+    public Flux<Pair<MappingSource, Mapping>> getAllMappings() {
         return executor.executeReactor(retrieveAllMappingsStatement.bind())
-            .map(resultSet -> cassandraUtils.convertToStream(resultSet)
-                .map(row -> new UserMapping(MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)), row.getString(MAPPING)))
-                .collect(Guavate.toImmutableMap(
-                    UserMapping::getSource,
-                    UserMapping::toMapping,
-                    Mappings::union)));
-    }
-
-    private static class UserMapping {
-        private final MappingSource source;
-        private final String mapping;
-
-        UserMapping(MappingSource source, String mapping) {
-            this.source = source;
-            this.mapping = mapping;
-        }
-
-        MappingSource getSource() {
-            return source;
-        }
-
-        String getMapping() {
-            return mapping;
-        }
-
-        Mappings toMapping() {
-            return MappingsImpl.fromRawString(getMapping());
-        }
+            .flatMapMany(Flux::fromIterable)
+            .map(row -> Pair.of(
+                MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)),
+                Mapping.of(row.getString(MAPPING))));
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
new file mode 100644
index 0000000..226add2
--- /dev/null
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
@@ -0,0 +1,65 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rrt.cassandra.migration;
+
+import javax.inject.Inject;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.migration.Migration;
+import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO;
+import org.apache.james.rrt.cassandra.CassandraRecipientRewriteTableDAO;
+import org.apache.james.rrt.lib.Mapping;
+import org.apache.james.rrt.lib.MappingSource;
+import org.apache.james.task.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+public class MappingsSourcesMigration implements Migration {
+    private static final Logger LOGGER = LoggerFactory.getLogger(MappingsSourcesMigration.class);
+    private final CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
+    private final CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO;
+
+    @Inject
+    public MappingsSourcesMigration(CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO,
+                                    CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO) {
+        this.cassandraRecipientRewriteTableDAO = cassandraRecipientRewriteTableDAO;
+        this.cassandraMappingsSourcesDAO = cassandraMappingsSourcesDAO;
+    }
+
+
+    @Override
+    public Result run() {
+        return cassandraRecipientRewriteTableDAO.getAllMappings()
+            .flatMap(this::migrate)
+            .reduce(Result.COMPLETED, Task::combine)
+            .doOnError(e -> LOGGER.error("Error while migrating mappings sources", e))
+            .onErrorResume(e -> Mono.just(Result.PARTIAL))
+            .block();
+    }
+
+    private Mono<Result> migrate(Pair<MappingSource, Mapping> mappingEntry) {
+        return cassandraMappingsSourcesDAO.addMapping(mappingEntry.getRight(), mappingEntry.getLeft())
+            .map(any -> Result.COMPLETED)
+            .doOnError(e -> LOGGER.error("Error while performing migration of mappings sources", e))
+            .onErrorResume(e -> Mono.just(Result.PARTIAL));
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
index 5d0a125..b277a4e 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
@@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.core.Domain;
 import org.apache.james.rrt.lib.Mapping;
 import org.apache.james.rrt.lib.MappingSource;

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
index 3abc541..1654166 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
@@ -58,7 +58,7 @@ class CassandraRecipientRewriteTableDAOTest {
 
     @Test
     void getAllMappingsShouldReturnEmptyByDefault() {
-        assertThat(dao.getAllMappings().block()).isEmpty();
+        assertThat(dao.getAllMappings().collectList().block()).isEmpty();
     }
 
     @Test
@@ -72,7 +72,7 @@ class CassandraRecipientRewriteTableDAOTest {
     void getAllMappingsShouldReturnStoredMapping() {
         dao.addMapping(SOURCE, MAPPING).block();
 
-        assertThat(dao.getAllMappings().block()).contains(Pair.of(SOURCE, MappingsImpl.fromMappings(MAPPING)));
+        assertThat(dao.getAllMappings().collectList().block()).contains(Pair.of(SOURCE, MAPPING));
     }
 
     @Test
@@ -90,7 +90,7 @@ class CassandraRecipientRewriteTableDAOTest {
 
         dao.removeMapping(SOURCE, MAPPING).block();
 
-        assertThat(dao.getAllMappings().block()).isEmpty();
+        assertThat(dao.getAllMappings().collectList().block()).isEmpty();
     }
 
     @Test
@@ -107,7 +107,7 @@ class CassandraRecipientRewriteTableDAOTest {
         dao.addMapping(SOURCE, MAPPING).block();
         dao.addMapping(SOURCE, MAPPING_2).block();
 
-        assertThat(dao.getAllMappings().block())
-            .contains(Pair.of(SOURCE, MappingsImpl.fromMappings(MAPPING, MAPPING_2)));
+        assertThat(dao.getAllMappings().collectList().block())
+            .contains(Pair.of(SOURCE, MAPPING), Pair.of(SOURCE, MAPPING_2));
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
new file mode 100644
index 0000000..d2c1c69
--- /dev/null
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
@@ -0,0 +1,151 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rrt.cassandra.migration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.migration.Migration;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.core.Domain;
+import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO;
+import org.apache.james.rrt.cassandra.CassandraRRTModule;
+import org.apache.james.rrt.cassandra.CassandraRecipientRewriteTableDAO;
+import org.apache.james.rrt.lib.Mapping;
+import org.apache.james.rrt.lib.MappingSource;
+import org.apache.james.task.Task;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import reactor.core.publisher.Flux;
+
+class MappingsSourcesMigrationTest {
+    private static final int THREAD_COUNT = 10;
+    private static final int OPERATION_COUNT = 10;
+    private static final int MAPPING_COUNT = 100;
+
+    private static final String USER = "test";
+    private static final String ADDRESS = "test@domain";
+    private static final MappingSource SOURCE = MappingSource.fromUser(USER, Domain.LOCALHOST);
+    private static final Mapping MAPPING = Mapping.alias(ADDRESS);
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraRRTModule.MODULE);
+
+    private CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
+    private CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO;
+
+    private MappingsSourcesMigration migration;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        cassandraRecipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        cassandraMappingsSourcesDAO = new CassandraMappingsSourcesDAO(cassandra.getConf());
+
+        migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO);
+    }
+
+    @Test
+    void emptyMigrationShouldSucceed() {
+        assertThat(migration.run()).isEqualTo(Migration.Result.COMPLETED);
+    }
+
+    @Test
+    void migrationShouldSucceedWithData() {
+        cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+
+        assertThat(migration.run()).isEqualTo(Task.Result.COMPLETED);
+    }
+
+    @Test
+    void migrationShouldCreateMappingSourceFromMapping() {
+        cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+
+        migration.run();
+
+        assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block())
+            .containsExactly(SOURCE);
+    }
+
+    @Test
+    void migrationShouldCreateMultipleMappingSourcesFromMappings() {
+        MappingSource source2 = MappingSource.fromUser("bob", Domain.LOCALHOST);
+
+        cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+        cassandraRecipientRewriteTableDAO.addMapping(source2, MAPPING).block();
+
+        migration.run();
+
+        assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block())
+            .containsOnly(SOURCE, source2);
+    }
+
+    @Test
+    void migrationShouldReturnPartialWhenGetAllMappingsFromMappingsFail() {
+        CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = mock(CassandraRecipientRewriteTableDAO.class);
+        CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO = mock(CassandraMappingsSourcesDAO.class);
+        migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO);
+
+        when(cassandraRecipientRewriteTableDAO.getAllMappings()).thenReturn(Flux.error(new RuntimeException()));
+
+        assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+    }
+
+    @Test
+    void migrationShouldReturnPartialAddMappingFails() {
+        CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = mock(CassandraRecipientRewriteTableDAO.class);
+        CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO = mock(CassandraMappingsSourcesDAO.class);
+        migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO);
+
+        when(cassandraRecipientRewriteTableDAO.getAllMappings())
+            .thenReturn(Flux.just(Pair.of(SOURCE, MAPPING)));
+        when(cassandraMappingsSourcesDAO.addMapping(any(Mapping.class), any(MappingSource.class)))
+            .thenThrow(new RuntimeException());
+
+        assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+    }
+
+    @Test
+    void migrationShouldBeIdempotentWhenRunMultipleTimes() throws ExecutionException, InterruptedException {
+        IntStream.range(0, MAPPING_COUNT)
+            .forEach(i -> cassandraRecipientRewriteTableDAO
+                .addMapping(MappingSource.parse("source" + i + "@domain"), MAPPING).block());
+
+        ConcurrentTestRunner.builder()
+            .operation((threadNumber, step) -> migration.run())
+            .threadCount(THREAD_COUNT)
+            .operationCount(OPERATION_COUNT)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block())
+            .hasSize(MAPPING_COUNT);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org