You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/10/26 02:55:10 UTC

[james-project] 07/11: JAMES-3430 Provide migration for MessageV3 table

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5ab6e6d765a51193254ca1aab1d36a49d0279d0d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Oct 20 16:54:33 2020 +0700

    JAMES-3430 Provide migration for MessageV3 table
---
 .../versions/CassandraSchemaVersionManager.java    |   2 +-
 .../cassandra/mail/CassandraMessageDAO.java        |  33 +++++
 .../cassandra/mail/CassandraMessageDAOV3.java      |  43 ++++++
 .../cassandra/mail/MessageRepresentation.java      |  20 ++-
 .../mail/migration/MessageV3Migration.java         | 119 ++++++++++++++++
 ...ageV3MigrationTaskAdditionalInformationDTO.java |  69 +++++++++
 .../mail/migration/MessageV3MigrationTaskDTO.java  |  59 ++++++++
 .../cassandra/mail/CassandraMessageDAOTest.java    |   1 +
 .../MessageV3MigrationTaskSerializationTest.java   |  52 +++++++
 .../mail/migration/MessageV3MigrationTest.java     | 157 +++++++++++++++++++++
 .../mailbox/store/mail/model/impl/Properties.java  |  17 +++
 .../modules/webadmin/CassandraRoutesModule.java    |   3 +
 12 files changed, 570 insertions(+), 5 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
index e38037a..29dae94 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
@@ -36,7 +36,7 @@ import reactor.core.publisher.Mono;
 
 public class CassandraSchemaVersionManager {
     public static final SchemaVersion MIN_VERSION = new SchemaVersion(5);
-    public static final SchemaVersion MAX_VERSION = new SchemaVersion(8);
+    public static final SchemaVersion MAX_VERSION = new SchemaVersion(9);
     public static final SchemaVersion DEFAULT_VERSION = MIN_VERSION;
 
     private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSchemaVersionManager.class);
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 632af57..8d97ff5 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -78,6 +78,7 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Bytes;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 
@@ -92,7 +93,9 @@ public class CassandraMessageDAO {
     private final PreparedStatement insert;
     private final PreparedStatement delete;
     private final PreparedStatement select;
+    private final PreparedStatement selectAll;
     private final Cid.CidParser cidParser;
+    private final CassandraMessageId.Factory messageIdFactory;
     private final ConsistencyLevel consistencyLevel;
 
     @Inject
@@ -100,8 +103,10 @@ public class CassandraMessageDAO {
                                CassandraTypesProvider typesProvider,
                                BlobStore blobStore,
                                BlobId.Factory blobIdFactory,
+                               CassandraMessageId.Factory messageIdFactory,
                                CassandraConsistenciesConfiguration consistenciesConfiguration) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
+        this.messageIdFactory = messageIdFactory;
         this.consistencyLevel = consistenciesConfiguration.getRegular();
         this.typesProvider = typesProvider;
         this.blobStore = blobStore;
@@ -110,6 +115,7 @@ public class CassandraMessageDAO {
         this.insert = prepareInsert(session);
         this.delete = prepareDelete(session);
         this.select = prepareSelect(session);
+        this.selectAll = prepareSelectAll(session);
         this.cidParser = Cid.parser().relaxed();
     }
 
@@ -119,6 +125,11 @@ public class CassandraMessageDAO {
             .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
     }
 
+    private PreparedStatement prepareSelectAll(Session session) {
+        return session.prepare(select()
+            .from(TABLE_NAME));
+    }
+
     private PreparedStatement prepareInsert(Session session) {
         return session.prepare(insertInto(TABLE_NAME)
             .value(MESSAGE_ID, bindMarker(MESSAGE_ID))
@@ -139,6 +150,11 @@ public class CassandraMessageDAO {
             .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
     }
 
+    public Flux<MessageRepresentation> list() {
+        return cassandraAsyncExecutor.executeRows(selectAll.bind())
+            .map(this::message);
+    }
+
     public Mono<Void> save(MailboxMessage message) throws MailboxException {
         return saveContent(message)
             .flatMap(pair -> cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair)));
@@ -244,6 +260,23 @@ public class CassandraMessageDAO {
                 bodyId));
     }
 
+    private MessageRepresentation message(Row row) {
+        BlobId headerId = retrieveBlobId(HEADER_CONTENT, row);
+        BlobId bodyId = retrieveBlobId(BODY_CONTENT, row);
+        CassandraMessageId messageId = messageIdFactory.of(row.getUUID(MESSAGE_ID));
+
+        return new MessageRepresentation(
+                messageId,
+                row.getTimestamp(INTERNAL_DATE),
+                row.getLong(FULL_CONTENT_OCTETS),
+                row.getInt(BODY_START_OCTET),
+                new SharedByteArrayInputStream(EMPTY_BYTE_ARRAY),
+                getProperties(row),
+                getAttachments(row).collect(Guavate.toImmutableList()),
+                headerId,
+                bodyId);
+    }
+
     private org.apache.james.mailbox.store.mail.model.impl.Properties getProperties(Row row) {
         PropertyBuilder property = new PropertyBuilder(
             row.getList(PROPERTIES, UDTValue.class).stream()
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
index 2c6b5ae..8f349da 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
@@ -159,6 +159,31 @@ public class CassandraMessageDAOV3 {
             .flatMap(pair -> cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair)));
     }
 
+    public Mono<Void> save(MessageRepresentation message) {
+        CassandraMessageId messageId = (CassandraMessageId) message.getMessageId();
+        return cassandraAsyncExecutor.executeVoid(insert.bind()
+            .setUUID(MESSAGE_ID, messageId.get())
+            .setTimestamp(INTERNAL_DATE, message.getInternalDate())
+            .setInt(BODY_START_OCTET, message.getBodyStartOctet())
+            .setLong(FULL_CONTENT_OCTETS, message.getSize())
+            .setLong(BODY_OCTECTS, message.getSize() - message.getBodyStartOctet())
+            .setString(BODY_CONTENT, message.getBodyId().asString())
+            .setString(HEADER_CONTENT, message.getHeaderId().asString())
+            .setLong(TEXTUAL_LINE_COUNT, Optional.ofNullable(message.getProperties().getTextualLineCount()).orElse(DEFAULT_LONG_VALUE))
+            .setString(CONTENT_DESCRIPTION, message.getProperties().getContentDescription())
+            .setString(CONTENT_DISPOSITION_TYPE, message.getProperties().getContentDispositionType())
+            .setString(MEDIA_TYPE, message.getProperties().getMediaType())
+            .setString(SUB_TYPE, message.getProperties().getSubType())
+            .setString(CONTENT_ID, message.getProperties().getContentID())
+            .setString(CONTENT_MD5, message.getProperties().getContentMD5())
+            .setString(CONTENT_TRANSFER_ENCODING, message.getProperties().getContentTransferEncoding())
+            .setString(CONTENT_LOCATION, message.getProperties().getContentLocation())
+            .setList(CONTENT_LANGUAGE, message.getProperties().getContentLanguage())
+            .setMap(CONTENT_DISPOSITION_PARAMETERS, message.getProperties().getContentDispositionParameters())
+            .setMap(CONTENT_TYPE_PARAMETERS, message.getProperties().getContentTypeParameters())
+            .setList(ATTACHMENTS, buildAttachmentUdt(message.getAttachments())));
+    }
+
     private Mono<Tuple2<BlobId, BlobId>> saveContent(MailboxMessage message) throws MailboxException {
         try {
             byte[] headerContent = IOUtils.toByteArray(message.getHeaderContent());
@@ -204,6 +229,12 @@ public class CassandraMessageDAOV3 {
             .collect(Guavate.toImmutableList());
     }
 
+    private ImmutableList<UDTValue> buildAttachmentUdt(List<MessageAttachmentRepresentation> attachments) {
+        return attachments.stream()
+            .map(this::toUDT)
+            .collect(Guavate.toImmutableList());
+    }
+
     private UDTValue toUDT(MessageAttachmentMetadata messageAttachment) {
         UDTValue result = typesProvider.getDefinedUserType(ATTACHMENTS)
             .newValue()
@@ -216,6 +247,18 @@ public class CassandraMessageDAOV3 {
         return result;
     }
 
+    private UDTValue toUDT(MessageAttachmentRepresentation messageAttachment) {
+        UDTValue result = typesProvider.getDefinedUserType(ATTACHMENTS)
+            .newValue()
+            .setString(Attachments.ID, messageAttachment.getAttachmentId().getId())
+            .setBool(Attachments.IS_INLINE, messageAttachment.isInline());
+        messageAttachment.getName()
+            .ifPresent(name -> result.setString(Attachments.NAME, name));
+        messageAttachment.getCid()
+            .ifPresent(cid -> result.setString(Attachments.CID, cid.getValue()));
+        return result;
+    }
+
     public Mono<MessageRepresentation> retrieveMessage(ComposedMessageIdWithMetaData id, FetchType fetchType) {
         CassandraMessageId cassandraMessageId = (CassandraMessageId) id.getComposedMessageId().getMessageId();
         return retrieveMessage(cassandraMessageId, fetchType);
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
index 19fd945..06274e5 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
@@ -35,19 +35,19 @@ public class MessageRepresentation {
     private final MessageId messageId;
     private final Date internalDate;
     private final Long size;
-    private final Integer bodySize;
+    private final Integer bodyStartOctet;
     private final SharedByteArrayInputStream content;
     private final Properties properties;
     private final List<MessageAttachmentRepresentation> attachments;
     private final BlobId headerId;
     private final BlobId bodyId;
 
-    public MessageRepresentation(MessageId messageId, Date internalDate, Long size, Integer bodySize, SharedByteArrayInputStream content,
+    public MessageRepresentation(MessageId messageId, Date internalDate, Long size, Integer bodyStartOctet, SharedByteArrayInputStream content,
                                  Properties properties, List<MessageAttachmentRepresentation> attachments, BlobId headerId, BlobId bodyId) {
         this.messageId = messageId;
         this.internalDate = internalDate;
         this.size = size;
-        this.bodySize = bodySize;
+        this.bodyStartOctet = bodyStartOctet;
         this.content = content;
         this.properties = properties;
         this.attachments = attachments;
@@ -62,7 +62,7 @@ public class MessageRepresentation {
             .uid(metadata.getComposedMessageId().getUid())
             .modseq(metadata.getModSeq())
             .internalDate(internalDate)
-            .bodyStartOctet(bodySize)
+            .bodyStartOctet(bodyStartOctet)
             .size(size)
             .content(content)
             .flags(metadata.getFlags())
@@ -71,6 +71,18 @@ public class MessageRepresentation {
             .build();
     }
 
+    public Date getInternalDate() {
+        return internalDate;
+    }
+
+    public Long getSize() {
+        return size;
+    }
+
+    public Integer getBodyStartOctet() {
+        return bodyStartOctet;
+    }
+
     public MessageId getMessageId() {
         return messageId;
     }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3Migration.java
new file mode 100644
index 0000000..2c37c4c
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3Migration.java
@@ -0,0 +1,119 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.migration;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Optional;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.migration.Migration;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV3;
+import org.apache.james.mailbox.cassandra.mail.MessageRepresentation;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+public class MessageV3Migration implements Migration {
+    private static final int CONCURRENCY = 50;
+
+    static class MessageV3MigrationTask implements Task {
+        private final MessageV3Migration migration;
+
+        MessageV3MigrationTask(MessageV3Migration migration) {
+            this.migration = migration;
+        }
+
+        @Override
+        public Result run() throws InterruptedException {
+            return migration.runTask();
+        }
+
+        @Override
+        public TaskType type() {
+            return TYPE;
+        }
+
+        @Override
+        public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+            return Optional.of(migration.getAdditionalInformation());
+        }
+    }
+
+    public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
+        private final Instant timestamp;
+
+        public AdditionalInformation(Instant timestamp) {
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public Instant timestamp() {
+            return timestamp;
+        }
+    }
+
+    public static final Logger LOGGER = LoggerFactory.getLogger(MessageV3Migration.class);
+    public static final TaskType TYPE = TaskType.of("cassandra-message-v3-migration");
+    private final CassandraMessageDAO daoV2;
+    private final CassandraMessageDAOV3 daoV3;
+
+    @Inject
+    public MessageV3Migration(CassandraMessageDAO daoV2, CassandraMessageDAOV3 daoV3) {
+        this.daoV2 = daoV2;
+        this.daoV3 = daoV3;
+    }
+
+    @Override
+    public void apply() {
+        daoV2.list()
+            .flatMap(this::migrate, CONCURRENCY)
+            .doOnError(t -> LOGGER.error("Error while performing migration", t))
+            .blockLast();
+    }
+
+    private Mono<Void> migrate(MessageRepresentation messageRepresentation) {
+        return daoV3.save(messageRepresentation)
+            .then(daoV2.delete((CassandraMessageId) messageRepresentation.getMessageId()))
+            .onErrorResume(error -> handleErrorMigrate(messageRepresentation, error))
+            .then();
+    }
+
+    private Mono<Void> handleErrorMigrate(MessageRepresentation messageRepresentation, Throwable throwable) {
+        LOGGER.error("Error while performing migration for {}", messageRepresentation.getMessageId(), throwable);
+        return Mono.empty();
+    }
+
+    @Override
+    public Task asTask() {
+        return new MessageV3MigrationTask(this);
+    }
+
+    AdditionalInformation getAdditionalInformation() {
+        return new AdditionalInformation(Clock.systemUTC().instant());
+    }
+}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskAdditionalInformationDTO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskAdditionalInformationDTO.java
new file mode 100644
index 0000000..c5609cc
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskAdditionalInformationDTO.java
@@ -0,0 +1,69 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.migration;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class MessageV3MigrationTaskAdditionalInformationDTO implements AdditionalInformationDTO {
+    private static MessageV3MigrationTaskAdditionalInformationDTO fromDomainObject(MessageV3Migration.AdditionalInformation additionalInformation, String type) {
+        return new MessageV3MigrationTaskAdditionalInformationDTO(
+            type,
+            additionalInformation.timestamp()
+        );
+    }
+
+    public static final AdditionalInformationDTOModule<MessageV3Migration.AdditionalInformation, MessageV3MigrationTaskAdditionalInformationDTO> MODULE =
+        DTOModule
+            .forDomainObject(MessageV3Migration.AdditionalInformation.class)
+            .convertToDTO(MessageV3MigrationTaskAdditionalInformationDTO.class)
+            .toDomainObjectConverter(MessageV3MigrationTaskAdditionalInformationDTO::toDomainObject)
+            .toDTOConverter(MessageV3MigrationTaskAdditionalInformationDTO::fromDomainObject)
+            .typeName(MessageV3Migration.TYPE.asString())
+            .withFactory(AdditionalInformationDTOModule::new);
+
+    private final String type;
+    private final Instant timestamp;
+
+    public MessageV3MigrationTaskAdditionalInformationDTO(@JsonProperty("type") String type,
+                                                          @JsonProperty("timestamp") Instant timestamp) {
+        this.type = type;
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public Instant getTimestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public String getType() {
+        return type;
+    }
+
+    private MessageV3Migration.AdditionalInformation toDomainObject() {
+        return new MessageV3Migration.AdditionalInformation(timestamp);
+    }
+}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskDTO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskDTO.java
new file mode 100644
index 0000000..87a1fec
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskDTO.java
@@ -0,0 +1,59 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.migration;
+
+import java.util.function.Function;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.TaskDTO;
+import org.apache.james.server.task.json.dto.TaskDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class MessageV3MigrationTaskDTO implements TaskDTO {
+
+    private static MessageV3MigrationTaskDTO fromDomainObject(MessageV3Migration.MessageV3MigrationTask task, String type) {
+        return new MessageV3MigrationTaskDTO(type);
+    }
+
+    public static final Function<MessageV3Migration, TaskDTOModule<MessageV3Migration.MessageV3MigrationTask, MessageV3MigrationTaskDTO>> MODULE = (migration) ->
+        DTOModule
+            .forDomainObject(MessageV3Migration.MessageV3MigrationTask.class)
+            .convertToDTO(MessageV3MigrationTaskDTO.class)
+            .toDomainObjectConverter(dto -> dto.toDomainObject(migration))
+            .toDTOConverter(MessageV3MigrationTaskDTO::fromDomainObject)
+            .typeName(MessageV3Migration.TYPE.asString())
+            .withFactory(TaskDTOModule::new);
+
+    private final String type;
+
+    public MessageV3MigrationTaskDTO(@JsonProperty("type") String type) {
+        this.type = type;
+    }
+
+    @Override
+    public String getType() {
+        return type;
+    }
+
+    private MessageV3Migration.MessageV3MigrationTask toDomainObject(MessageV3Migration migration) {
+        return new MessageV3Migration.MessageV3MigrationTask(migration);
+    }
+}
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
index bb52781..5f044db 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
@@ -94,6 +94,7 @@ class CassandraMessageDAOTest {
             cassandra.getTypesProvider(),
             blobStore,
             blobIdFactory,
+            messageIdFactory,
             cassandraCluster.getCassandraConsistenciesConfiguration());
 
         messageIdWithMetadata = ComposedMessageIdWithMetaData.builder()
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskSerializationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskSerializationTest.java
new file mode 100644
index 0000000..1799543
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskSerializationTest.java
@@ -0,0 +1,52 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.migration;
+
+import static org.mockito.Mockito.mock;
+
+import java.time.Instant;
+
+import org.apache.james.JsonSerializationVerifier;
+import org.junit.jupiter.api.Test;
+
+class MessageV3MigrationTaskSerializationTest {
+    private static final Instant TIMESTAMP = Instant.parse("2018-11-13T12:00:55Z");
+    private static final MessageV3Migration MIGRATION = mock(MessageV3Migration.class);
+    private static final MessageV3Migration.MessageV3MigrationTask TASK = new MessageV3Migration.MessageV3MigrationTask(MIGRATION);
+    private static final String SERIALIZED_TASK = "{\"type\": \"cassandra-message-v3-migration\"}";
+    private static final MessageV3Migration.AdditionalInformation DETAILS = new MessageV3Migration.AdditionalInformation(TIMESTAMP);
+    private static final String SERIALIZED_ADDITIONAL_INFORMATION = "{\"type\": \"cassandra-message-v3-migration\", \"timestamp\":\"2018-11-13T12:00:55Z\"}";
+
+    @Test
+    void taskShouldBeSerializable() throws Exception {
+        JsonSerializationVerifier.dtoModule(MessageV3MigrationTaskDTO.MODULE.apply(MIGRATION))
+            .bean(TASK)
+            .json(SERIALIZED_TASK)
+            .verify();
+    }
+
+    @Test
+    void additionalInformationShouldBeSerializable() throws Exception {
+        JsonSerializationVerifier.dtoModule(MessageV3MigrationTaskAdditionalInformationDTO.MODULE)
+            .bean(DETAILS)
+            .json(SERIALIZED_ADDITIONAL_INFORMATION)
+            .verify();
+    }
+}
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTest.java
new file mode 100644
index 0000000..51d9d31
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTest.java
@@ -0,0 +1,157 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.migration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.List;
+
+import javax.mail.Flags;
+import javax.mail.util.SharedByteArrayInputStream;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.cassandra.CassandraBlobModule;
+import org.apache.james.blob.cassandra.CassandraBlobStoreFactory;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.cassandra.ids.CassandraId;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV3;
+import org.apache.james.mailbox.cassandra.mail.MessageRepresentation;
+import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule;
+import org.apache.james.mailbox.model.MessageAttachmentMetadata;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableList;
+
+class MessageV3MigrationTest {
+    private static final int BODY_START = 16;
+    private static final CassandraId MAILBOX_ID = CassandraId.timeBased();
+    private static final String CONTENT = "Subject: Test7 \n\nBody7\n.\n";
+    private static final MessageUid messageUid = MessageUid.of(1);
+    private static final List<MessageAttachmentMetadata> NO_ATTACHMENT = ImmutableList.of();
+
+    public static final CassandraModule MODULES = CassandraModule.aggregateModules(
+        CassandraMessageModule.MODULE,
+        CassandraBlobModule.MODULE,
+        CassandraSchemaVersionModule.MODULE);
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULES);
+
+    private CassandraMessageDAO daoV2;
+    private CassandraMessageDAOV3 daoV3;
+    private CassandraMessageId.Factory messageIdFactory;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf())
+            .passthrough();
+        HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
+        daoV2 = new CassandraMessageDAO(
+            cassandra.getConf(),
+            cassandra.getTypesProvider(),
+            blobStore,
+            blobIdFactory,
+            new CassandraMessageId.Factory(),
+            cassandraCluster.getCassandraConsistenciesConfiguration());
+        daoV3 = new CassandraMessageDAOV3(
+            cassandra.getConf(),
+            cassandra.getTypesProvider(),
+            blobStore,
+            blobIdFactory,
+            cassandraCluster.getCassandraConsistenciesConfiguration());
+        messageIdFactory = new CassandraMessageId.Factory();
+    }
+
+    @Test
+    void migrationTaskShouldMoveDataToMostRecentDao() throws Exception{
+        SimpleMailboxMessage message1 = createMessage(messageIdFactory.generate());
+        SimpleMailboxMessage message2 = createMessage(messageIdFactory.generate());
+        SimpleMailboxMessage message3 = createMessage(messageIdFactory.generate());
+        SimpleMailboxMessage message4 = createMessage(messageIdFactory.generate());
+
+        daoV2.save(message1).block();
+        daoV2.save(message2).block();
+        daoV2.save(message3).block();
+        daoV2.save(message4).block();
+
+        new MessageV3Migration(daoV2, daoV3).apply();
+
+        SoftAssertions.assertSoftly(softly -> {
+            softly.assertThat(daoV3.retrieveMessage((CassandraMessageId) message1.getMessageId(), MessageMapper.FetchType.Metadata).block().getMessageId())
+                .isEqualTo(message1.getMessageId());
+            softly.assertThat(daoV3.retrieveMessage((CassandraMessageId) message2.getMessageId(), MessageMapper.FetchType.Metadata).block().getMessageId())
+                .isEqualTo(message2.getMessageId());
+            softly.assertThat(daoV3.retrieveMessage((CassandraMessageId) message3.getMessageId(), MessageMapper.FetchType.Metadata).block().getMessageId())
+                .isEqualTo(message3.getMessageId());
+            softly.assertThat(daoV3.retrieveMessage((CassandraMessageId) message4.getMessageId(), MessageMapper.FetchType.Metadata).block().getMessageId())
+                .isEqualTo(message4.getMessageId());
+
+            softly.assertThat(daoV2.list().collectList().block()).isEmpty();
+        });
+    }
+
+    @Test
+    void migrationTaskShouldPreserveMessageContent() throws Exception{
+        SimpleMailboxMessage message1 = createMessage(messageIdFactory.generate());
+        daoV2.save(message1).block();
+        MessageRepresentation original = daoV2.retrieveMessage((CassandraMessageId) message1.getMessageId(), MessageMapper.FetchType.Metadata).block();
+
+        new MessageV3Migration(daoV2, daoV3).apply();
+        MessageRepresentation migrated = daoV3.retrieveMessage((CassandraMessageId) message1.getMessageId(), MessageMapper.FetchType.Metadata).block();
+
+        int start = 0;
+        int end = -1;
+        assertThat(migrated).isEqualToComparingOnlyGivenFields(original, "messageId",
+            "internalDate", "size", "bodyStartOctet", "properties", "attachments", "headerId", "bodyId");
+        assertThat(migrated.getContent().newStream(start, end))
+            .hasSameContentAs(original.getContent().newStream(start, end));
+    }
+
+    private SimpleMailboxMessage createMessage(MessageId messageId) {
+        return SimpleMailboxMessage.builder()
+            .messageId(messageId)
+            .mailboxId(MAILBOX_ID)
+            .uid(messageUid)
+            .internalDate(new Date())
+            .bodyStartOctet(MessageV3MigrationTest.BODY_START)
+            .size(MessageV3MigrationTest.CONTENT.length())
+            .content(new SharedByteArrayInputStream(MessageV3MigrationTest.CONTENT.getBytes(StandardCharsets.UTF_8)))
+            .flags(new Flags())
+            .properties(new PropertyBuilder().build())
+            .addAttachments(NO_ATTACHMENT)
+            .build();
+    }
+}
\ No newline at end of file
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/Properties.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/Properties.java
index 5754b71..6b63d07 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/Properties.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/Properties.java
@@ -41,6 +41,7 @@ import static org.apache.james.mailbox.store.mail.model.StandardNames.MIME_SUB_T
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -222,6 +223,22 @@ public class Properties {
         return new ArrayList<>(properties);
     }
 
+    @Override
+    public final boolean equals(Object o) {
+        if (o instanceof Properties) {
+            Properties that = (Properties) o;
+
+            return Objects.equals(this.textualLineCount, that.textualLineCount)
+                && Objects.equals(this.properties, that.properties);
+        }
+        return false;
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hash(textualLineCount, properties);
+    }
+
     /**
      * Constructs a <code>String</code> with all attributes
      * in name = value format.
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/webadmin/CassandraRoutesModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/webadmin/CassandraRoutesModule.java
index 90e8911..ff0f783 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/webadmin/CassandraRoutesModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/webadmin/CassandraRoutesModule.java
@@ -27,6 +27,7 @@ import org.apache.james.backends.cassandra.versions.SchemaTransition;
 import org.apache.james.backends.cassandra.versions.SchemaVersion;
 import org.apache.james.mailbox.cassandra.mail.migration.MailboxPathV2Migration;
 import org.apache.james.mailbox.cassandra.mail.migration.MailboxPathV3Migration;
+import org.apache.james.mailbox.cassandra.mail.migration.MessageV3Migration;
 import org.apache.james.rrt.cassandra.migration.MappingsSourcesMigration;
 import org.apache.james.webadmin.Routes;
 import org.apache.james.webadmin.routes.CassandraMailboxMergingRoutes;
@@ -42,6 +43,7 @@ public class CassandraRoutesModule extends AbstractModule {
     private static final SchemaTransition FROM_V5_TO_V6 = SchemaTransition.to(new SchemaVersion(6));
     private static final SchemaTransition FROM_V6_TO_V7 = SchemaTransition.to(new SchemaVersion(7));
     private static final SchemaTransition FROM_V7_TO_V8 = SchemaTransition.to(new SchemaVersion(8));
+    private static final SchemaTransition FROM_V8_TO_V9 = SchemaTransition.to(new SchemaVersion(9));
 
     @Override
     protected void configure() {
@@ -60,6 +62,7 @@ public class CassandraRoutesModule extends AbstractModule {
         allMigrationClazzBinder.addBinding(FROM_V5_TO_V6).to(MailboxPathV2Migration.class);
         allMigrationClazzBinder.addBinding(FROM_V6_TO_V7).to(MappingsSourcesMigration.class);
         allMigrationClazzBinder.addBinding(FROM_V7_TO_V8).to(MailboxPathV3Migration.class);
+        allMigrationClazzBinder.addBinding(FROM_V8_TO_V9).to(MessageV3Migration.class);
 
         bind(SchemaVersion.class)
             .annotatedWith(Names.named(CassandraMigrationService.LATEST_VERSION))


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org