You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ad...@apache.org on 2017/09/13 10:57:29 UTC

[03/24] james-project git commit: MAILBOX-304 Create and implement migration task

MAILBOX-304 Create and implement migration task


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/2001443b
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/2001443b
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/2001443b

Branch: refs/heads/master
Commit: 2001443b951ee46828b68331e75f9a0708452fde
Parents: 8f1c35b
Author: benwa <bt...@linagora.com>
Authored: Fri Sep 8 18:28:30 2017 +0700
Committer: Antoine Duprat <ad...@linagora.com>
Committed: Wed Sep 13 10:17:09 2017 +0200

----------------------------------------------------------------------
 .../versions/CassandraSchemaVersionManager.java |   2 +-
 .../mail/migration/AttachmentV2Migration.java   |  71 ++++++
 .../migration/AttachmentV2MigrationTest.java    | 219 +++++++++++++++++++
 .../modules/mailbox/CassandraMailboxModule.java |   4 +
 .../modules/server/CassandraRoutesModule.java   |   3 +
 5 files changed, 298 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/2001443b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
index 79072b7..663db63 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
@@ -34,7 +34,7 @@ import com.google.common.base.Preconditions;
 
 public class CassandraSchemaVersionManager {
     public static final int MIN_VERSION = 2;
-    public static final int MAX_VERSION = 3;
+    public static final int MAX_VERSION = 4;
     public static final int DEFAULT_VERSION = 2;
 
     private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSchemaVersionManager.class);

http://git-wip-us.apache.org/repos/asf/james-project/blob/2001443b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
new file mode 100644
index 0000000..8290deb
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
@@ -0,0 +1,71 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.migration;
+
+import javax.inject.Inject;
+
+import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAO;
+import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAOV2;
+import org.apache.james.mailbox.cassandra.mail.CassandraBlobsDAO;
+import org.apache.james.mailbox.model.Attachment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AttachmentV2Migration implements Migration {
+    private static final Logger LOGGER = LoggerFactory.getLogger(AttachmentV2Migration.class);
+    private final CassandraAttachmentDAO attachmentDAOV1;
+    private final CassandraAttachmentDAOV2 attachmentDAOV2;
+    private final CassandraBlobsDAO blobsDAO;
+
+    @Inject
+    public AttachmentV2Migration(CassandraAttachmentDAO attachmentDAOV1,
+                                 CassandraAttachmentDAOV2 attachmentDAOV2,
+                                 CassandraBlobsDAO blobsDAO) {
+        this.attachmentDAOV1 = attachmentDAOV1;
+        this.attachmentDAOV2 = attachmentDAOV2;
+        this.blobsDAO = blobsDAO;
+    }
+
+    @Override
+    public MigrationResult run() {
+        try {
+            return attachmentDAOV1.retrieveAll()
+                .map(this::migrateAttachment)
+                .reduce(MigrationResult.COMPLETED, Migration::combine);
+        } catch (Exception e) {
+            LOGGER.error("Error while performing attachmentDAO V2 migration", e);
+            return MigrationResult.PARTIAL;
+        }
+    }
+
+    private MigrationResult migrateAttachment(Attachment attachment) {
+        try {
+            blobsDAO.save(attachment.getBytes())
+                .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
+                .thenCompose(attachmentDAOV2::storeAttachment)
+                .thenCompose(any -> attachmentDAOV1.deleteAttachment(attachment.getAttachmentId()))
+                .join();
+            return MigrationResult.COMPLETED;
+        } catch (Exception e) {
+            LOGGER.error("Error while performing attachmentDAO V2 migration", e);
+            return MigrationResult.PARTIAL;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/2001443b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
new file mode 100644
index 0000000..7bbbb39
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
@@ -0,0 +1,219 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.migration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.DockerCassandraRule;
+import org.apache.james.backends.cassandra.init.CassandraModuleComposite;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.mailbox.cassandra.ids.BlobId;
+import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAO;
+import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAOV2;
+import org.apache.james.mailbox.cassandra.mail.CassandraBlobsDAO;
+import org.apache.james.mailbox.cassandra.modules.CassandraAttachmentModule;
+import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule;
+import org.apache.james.mailbox.model.Attachment;
+import org.apache.james.mailbox.model.AttachmentId;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+public class AttachmentV2MigrationTest {
+    public static final AttachmentId ATTACHMENT_ID = AttachmentId.from("id1");
+    public static final AttachmentId ATTACHMENT_ID_2 = AttachmentId.from("id2");
+
+    @ClassRule
+    public static DockerCassandraRule cassandraServer = new DockerCassandraRule();
+
+    private CassandraAttachmentDAO attachmentDAO;
+    private CassandraAttachmentDAOV2 attachmentDAOV2;
+    private CassandraBlobsDAO blobsDAO;
+    private AttachmentV2Migration migration;
+    private Attachment attachment1;
+    private Attachment attachment2;
+
+    @Before
+    public void setUp() {
+        CassandraCluster cassandra = CassandraCluster.create(
+            new CassandraModuleComposite(
+                new CassandraAttachmentModule(),
+                new CassandraBlobModule()),
+            cassandraServer.getIp(),
+            cassandraServer.getBindingPort());
+
+        attachmentDAO = new CassandraAttachmentDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        attachmentDAOV2 = new CassandraAttachmentDAOV2(cassandra.getConf());
+        blobsDAO = new CassandraBlobsDAO(cassandra.getConf());
+
+        migration = new AttachmentV2Migration(attachmentDAO, attachmentDAOV2, blobsDAO);
+
+        attachment1 = Attachment.builder()
+            .attachmentId(ATTACHMENT_ID)
+            .type("application/json")
+            .bytes("{\"property\":`\"value1\"}".getBytes(StandardCharsets.UTF_8))
+            .build();
+        attachment2 = Attachment.builder()
+            .attachmentId(ATTACHMENT_ID_2)
+            .type("application/json")
+            .bytes("{\"property\":`\"value2\"}".getBytes(StandardCharsets.UTF_8))
+            .build();
+    }
+
+    @Test
+    public void emptyMigrationShouldSucceed() {
+        assertThat(migration.run())
+            .isEqualTo(Migration.MigrationResult.COMPLETED);
+    }
+
+    @Test
+    public void migrationShouldSucceed() throws Exception {
+        attachmentDAO.storeAttachment(attachment1).join();
+        attachmentDAO.storeAttachment(attachment2).join();
+
+        assertThat(migration.run())
+            .isEqualTo(Migration.MigrationResult.COMPLETED);
+    }
+
+    @Test
+    public void migrationShouldMoveAttachmentsToV2() throws Exception {
+        attachmentDAO.storeAttachment(attachment1).join();
+        attachmentDAO.storeAttachment(attachment2).join();
+
+        migration.run();
+
+        assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID).join())
+            .contains(CassandraAttachmentDAOV2.from(attachment1, BlobId.forPayload(attachment1.getBytes())));
+        assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID_2).join())
+            .contains(CassandraAttachmentDAOV2.from(attachment2, BlobId.forPayload(attachment2.getBytes())));
+        assertThat(blobsDAO.read(BlobId.forPayload(attachment1.getBytes())).join())
+            .isEqualTo(attachment1.getBytes());
+        assertThat(blobsDAO.read(BlobId.forPayload(attachment2.getBytes())).join())
+            .isEqualTo(attachment2.getBytes());
+    }
+
+    @Test
+    public void migrationShouldRemoveAttachmentsFromV1() throws Exception {
+        attachmentDAO.storeAttachment(attachment1).join();
+        attachmentDAO.storeAttachment(attachment2).join();
+
+        migration.run();
+
+        assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID).join())
+            .isEmpty();
+        assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID_2).join())
+            .isEmpty();
+    }
+
+    @Test
+    public void runShouldReturnPartialWhenInitialReadFail() throws Exception {
+        CassandraAttachmentDAO attachmentDAO = mock(CassandraAttachmentDAO.class);
+        CassandraAttachmentDAOV2 attachmentDAOV2 = mock(CassandraAttachmentDAOV2.class);
+        CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class);
+        migration = new AttachmentV2Migration(attachmentDAO, attachmentDAOV2, blobsDAO);
+
+        when(attachmentDAO.retrieveAll()).thenThrow(new RuntimeException());
+
+        assertThat(migration.run()).isEqualTo(Migration.MigrationResult.PARTIAL);
+    }
+
+    @Test
+    public void runShouldReturnPartialWhenSavingBlobsFails() throws Exception {
+        CassandraAttachmentDAO attachmentDAO = mock(CassandraAttachmentDAO.class);
+        CassandraAttachmentDAOV2 attachmentDAOV2 = mock(CassandraAttachmentDAOV2.class);
+        CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class);
+        migration = new AttachmentV2Migration(attachmentDAO, attachmentDAOV2, blobsDAO);
+
+        when(attachmentDAO.retrieveAll()).thenReturn(Stream.of(
+            attachment1,
+            attachment2));
+        when(blobsDAO.save(any())).thenThrow(new RuntimeException());
+
+        assertThat(migration.run()).isEqualTo(Migration.MigrationResult.PARTIAL);
+    }
+
+    @Test
+    public void runShouldReturnPartialWhenSavingAttachmentV2Fail() throws Exception {
+        CassandraAttachmentDAO attachmentDAO = mock(CassandraAttachmentDAO.class);
+        CassandraAttachmentDAOV2 attachmentDAOV2 = mock(CassandraAttachmentDAOV2.class);
+        CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class);
+        migration = new AttachmentV2Migration(attachmentDAO, attachmentDAOV2, blobsDAO);
+
+        when(attachmentDAO.retrieveAll()).thenReturn(Stream.of(
+            attachment1,
+            attachment2));
+        when(blobsDAO.save(attachment1.getBytes()))
+            .thenReturn(CompletableFuture.completedFuture(BlobId.forPayload(attachment1.getBytes())));
+        when(blobsDAO.save(attachment2.getBytes()))
+            .thenReturn(CompletableFuture.completedFuture(BlobId.forPayload(attachment2.getBytes())));
+        when(attachmentDAOV2.storeAttachment(any())).thenThrow(new RuntimeException());
+
+        assertThat(migration.run()).isEqualTo(Migration.MigrationResult.PARTIAL);
+    }
+
+    @Test
+    public void runShouldReturnPartialWhenDeleteV1AttachmentFail() throws Exception {
+        CassandraAttachmentDAO attachmentDAO = mock(CassandraAttachmentDAO.class);
+        CassandraAttachmentDAOV2 attachmentDAOV2 = mock(CassandraAttachmentDAOV2.class);
+        CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class);
+        migration = new AttachmentV2Migration(attachmentDAO, attachmentDAOV2, blobsDAO);
+
+        when(attachmentDAO.retrieveAll()).thenReturn(Stream.of(
+            attachment1,
+            attachment2));
+        when(blobsDAO.save(attachment1.getBytes()))
+            .thenReturn(CompletableFuture.completedFuture(BlobId.forPayload(attachment1.getBytes())));
+        when(blobsDAO.save(attachment2.getBytes()))
+            .thenReturn(CompletableFuture.completedFuture(BlobId.forPayload(attachment2.getBytes())));
+        when(attachmentDAOV2.storeAttachment(any())).thenReturn(CompletableFuture.completedFuture(null));
+        when(attachmentDAO.deleteAttachment(any())).thenThrow(new RuntimeException());
+
+        assertThat(migration.run()).isEqualTo(Migration.MigrationResult.PARTIAL);
+    }
+
+    @Test
+    public void runShouldReturnPartialWhenAtLeastOneAttachmentMigrationFails() throws Exception {
+        CassandraAttachmentDAO attachmentDAO = mock(CassandraAttachmentDAO.class);
+        CassandraAttachmentDAOV2 attachmentDAOV2 = mock(CassandraAttachmentDAOV2.class);
+        CassandraBlobsDAO blobsDAO = mock(CassandraBlobsDAO.class);
+        migration = new AttachmentV2Migration(attachmentDAO, attachmentDAOV2, blobsDAO);
+
+        when(attachmentDAO.retrieveAll()).thenReturn(Stream.of(
+            attachment1,
+            attachment2));
+        when(blobsDAO.save(attachment1.getBytes()))
+            .thenReturn(CompletableFuture.completedFuture(BlobId.forPayload(attachment1.getBytes())));
+        when(blobsDAO.save(attachment2.getBytes()))
+            .thenThrow(new RuntimeException());
+        when(attachmentDAOV2.storeAttachment(any())).thenReturn(CompletableFuture.completedFuture(null));
+        when(attachmentDAO.deleteAttachment(any())).thenReturn(CompletableFuture.completedFuture(null));
+
+        assertThat(migration.run()).isEqualTo(Migration.MigrationResult.PARTIAL);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/2001443b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
index ae0b844..732b3ff 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
@@ -33,6 +33,8 @@ import org.apache.james.mailbox.cassandra.CassandraMailboxSessionMapperFactory;
 import org.apache.james.mailbox.cassandra.CassandraSubscriptionManager;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAO;
+import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAOV2;
 import org.apache.james.mailbox.cassandra.mail.CassandraBlobsDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
@@ -94,6 +96,8 @@ public class CassandraMailboxModule extends AbstractModule {
         bind(CassandraMailboxMapper.class).in(Scopes.SINGLETON);
         bind(CassandraMessageDAO.class).in(Scopes.SINGLETON);
         bind(CassandraBlobsDAO.class).in(Scopes.SINGLETON);
+        bind(CassandraAttachmentDAO.class).in(Scopes.SINGLETON);
+        bind(CassandraAttachmentDAOV2.class).in(Scopes.SINGLETON);
 
         bind(MessageMapperFactory.class).to(CassandraMailboxSessionMapperFactory.class);
         bind(MailboxMapperFactory.class).to(CassandraMailboxSessionMapperFactory.class);

http://git-wip-us.apache.org/repos/asf/james-project/blob/2001443b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
index 55b8c23..96f807c 100644
--- a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
+++ b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
@@ -20,6 +20,7 @@
 package org.apache.james.modules.server;
 
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
+import org.apache.james.mailbox.cassandra.mail.migration.AttachmentV2Migration;
 import org.apache.james.mailbox.cassandra.mail.migration.Migration;
 import org.apache.james.webadmin.Routes;
 import org.apache.james.webadmin.routes.CassandraMigrationRoutes;
@@ -33,6 +34,7 @@ import com.google.inject.name.Names;
 
 public class CassandraRoutesModule extends AbstractModule {
     private static final int FROM_V2_TO_V3 = 2;
+    private static final int FROM_V3_TO_V4 = 3;
 
     @Override
     protected void configure() {
@@ -44,6 +46,7 @@ public class CassandraRoutesModule extends AbstractModule {
 
         MapBinder<Integer, Migration> allMigrationClazzBinder = MapBinder.newMapBinder(binder(), Integer.class, Migration.class);
         allMigrationClazzBinder.addBinding(FROM_V2_TO_V3).toInstance(() -> Migration.MigrationResult.COMPLETED);
+        allMigrationClazzBinder.addBinding(FROM_V3_TO_V4).to(AttachmentV2Migration.class);
 
         bindConstant()
             .annotatedWith(Names.named(CassandraMigrationService.LATEST_VERSION))


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org