You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/10/26 02:55:10 UTC
[james-project] 07/11: JAMES-3430 Provide migration for MessageV3
table
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5ab6e6d765a51193254ca1aab1d36a49d0279d0d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Oct 20 16:54:33 2020 +0700
JAMES-3430 Provide migration for MessageV3 table
---
.../versions/CassandraSchemaVersionManager.java | 2 +-
.../cassandra/mail/CassandraMessageDAO.java | 33 +++++
.../cassandra/mail/CassandraMessageDAOV3.java | 43 ++++++
.../cassandra/mail/MessageRepresentation.java | 20 ++-
.../mail/migration/MessageV3Migration.java | 119 ++++++++++++++++
...ageV3MigrationTaskAdditionalInformationDTO.java | 69 +++++++++
.../mail/migration/MessageV3MigrationTaskDTO.java | 59 ++++++++
.../cassandra/mail/CassandraMessageDAOTest.java | 1 +
.../MessageV3MigrationTaskSerializationTest.java | 52 +++++++
.../mail/migration/MessageV3MigrationTest.java | 157 +++++++++++++++++++++
.../mailbox/store/mail/model/impl/Properties.java | 17 +++
.../modules/webadmin/CassandraRoutesModule.java | 3 +
12 files changed, 570 insertions(+), 5 deletions(-)
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
index e38037a..29dae94 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
@@ -36,7 +36,7 @@ import reactor.core.publisher.Mono;
public class CassandraSchemaVersionManager {
public static final SchemaVersion MIN_VERSION = new SchemaVersion(5);
- public static final SchemaVersion MAX_VERSION = new SchemaVersion(8);
+ public static final SchemaVersion MAX_VERSION = new SchemaVersion(9);
public static final SchemaVersion DEFAULT_VERSION = MIN_VERSION;
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSchemaVersionManager.class);
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 632af57..8d97ff5 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -78,6 +78,7 @@ import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Bytes;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
@@ -92,7 +93,9 @@ public class CassandraMessageDAO {
private final PreparedStatement insert;
private final PreparedStatement delete;
private final PreparedStatement select;
+ private final PreparedStatement selectAll;
private final Cid.CidParser cidParser;
+ private final CassandraMessageId.Factory messageIdFactory;
private final ConsistencyLevel consistencyLevel;
@Inject
@@ -100,8 +103,10 @@ public class CassandraMessageDAO {
CassandraTypesProvider typesProvider,
BlobStore blobStore,
BlobId.Factory blobIdFactory,
+ CassandraMessageId.Factory messageIdFactory,
CassandraConsistenciesConfiguration consistenciesConfiguration) {
this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
+ this.messageIdFactory = messageIdFactory;
this.consistencyLevel = consistenciesConfiguration.getRegular();
this.typesProvider = typesProvider;
this.blobStore = blobStore;
@@ -110,6 +115,7 @@ public class CassandraMessageDAO {
this.insert = prepareInsert(session);
this.delete = prepareDelete(session);
this.select = prepareSelect(session);
+ this.selectAll = prepareSelectAll(session);
this.cidParser = Cid.parser().relaxed();
}
@@ -119,6 +125,11 @@ public class CassandraMessageDAO {
.where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
}
+ private PreparedStatement prepareSelectAll(Session session) {
+ return session.prepare(select()
+ .from(TABLE_NAME));
+ }
+
private PreparedStatement prepareInsert(Session session) {
return session.prepare(insertInto(TABLE_NAME)
.value(MESSAGE_ID, bindMarker(MESSAGE_ID))
@@ -139,6 +150,11 @@ public class CassandraMessageDAO {
.where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
}
+ public Flux<MessageRepresentation> list() {
+ return cassandraAsyncExecutor.executeRows(selectAll.bind())
+ .map(this::message);
+ }
+
public Mono<Void> save(MailboxMessage message) throws MailboxException {
return saveContent(message)
.flatMap(pair -> cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair)));
@@ -244,6 +260,23 @@ public class CassandraMessageDAO {
bodyId));
}
+ private MessageRepresentation message(Row row) {
+ BlobId headerId = retrieveBlobId(HEADER_CONTENT, row);
+ BlobId bodyId = retrieveBlobId(BODY_CONTENT, row);
+ CassandraMessageId messageId = messageIdFactory.of(row.getUUID(MESSAGE_ID));
+
+ return new MessageRepresentation(
+ messageId,
+ row.getTimestamp(INTERNAL_DATE),
+ row.getLong(FULL_CONTENT_OCTETS),
+ row.getInt(BODY_START_OCTET),
+ new SharedByteArrayInputStream(EMPTY_BYTE_ARRAY),
+ getProperties(row),
+ getAttachments(row).collect(Guavate.toImmutableList()),
+ headerId,
+ bodyId);
+ }
+
private org.apache.james.mailbox.store.mail.model.impl.Properties getProperties(Row row) {
PropertyBuilder property = new PropertyBuilder(
row.getList(PROPERTIES, UDTValue.class).stream()
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
index 2c6b5ae..8f349da 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
@@ -159,6 +159,31 @@ public class CassandraMessageDAOV3 {
.flatMap(pair -> cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair)));
}
+ public Mono<Void> save(MessageRepresentation message) {
+ CassandraMessageId messageId = (CassandraMessageId) message.getMessageId();
+ return cassandraAsyncExecutor.executeVoid(insert.bind()
+ .setUUID(MESSAGE_ID, messageId.get())
+ .setTimestamp(INTERNAL_DATE, message.getInternalDate())
+ .setInt(BODY_START_OCTET, message.getBodyStartOctet())
+ .setLong(FULL_CONTENT_OCTETS, message.getSize())
+ .setLong(BODY_OCTECTS, message.getSize() - message.getBodyStartOctet())
+ .setString(BODY_CONTENT, message.getBodyId().asString())
+ .setString(HEADER_CONTENT, message.getHeaderId().asString())
+ .setLong(TEXTUAL_LINE_COUNT, Optional.ofNullable(message.getProperties().getTextualLineCount()).orElse(DEFAULT_LONG_VALUE))
+ .setString(CONTENT_DESCRIPTION, message.getProperties().getContentDescription())
+ .setString(CONTENT_DISPOSITION_TYPE, message.getProperties().getContentDispositionType())
+ .setString(MEDIA_TYPE, message.getProperties().getMediaType())
+ .setString(SUB_TYPE, message.getProperties().getSubType())
+ .setString(CONTENT_ID, message.getProperties().getContentID())
+ .setString(CONTENT_MD5, message.getProperties().getContentMD5())
+ .setString(CONTENT_TRANSFER_ENCODING, message.getProperties().getContentTransferEncoding())
+ .setString(CONTENT_LOCATION, message.getProperties().getContentLocation())
+ .setList(CONTENT_LANGUAGE, message.getProperties().getContentLanguage())
+ .setMap(CONTENT_DISPOSITION_PARAMETERS, message.getProperties().getContentDispositionParameters())
+ .setMap(CONTENT_TYPE_PARAMETERS, message.getProperties().getContentTypeParameters())
+ .setList(ATTACHMENTS, buildAttachmentUdt(message.getAttachments())));
+ }
+
private Mono<Tuple2<BlobId, BlobId>> saveContent(MailboxMessage message) throws MailboxException {
try {
byte[] headerContent = IOUtils.toByteArray(message.getHeaderContent());
@@ -204,6 +229,12 @@ public class CassandraMessageDAOV3 {
.collect(Guavate.toImmutableList());
}
+ private ImmutableList<UDTValue> buildAttachmentUdt(List<MessageAttachmentRepresentation> attachments) {
+ return attachments.stream()
+ .map(this::toUDT)
+ .collect(Guavate.toImmutableList());
+ }
+
private UDTValue toUDT(MessageAttachmentMetadata messageAttachment) {
UDTValue result = typesProvider.getDefinedUserType(ATTACHMENTS)
.newValue()
@@ -216,6 +247,18 @@ public class CassandraMessageDAOV3 {
return result;
}
+ private UDTValue toUDT(MessageAttachmentRepresentation messageAttachment) {
+ UDTValue result = typesProvider.getDefinedUserType(ATTACHMENTS)
+ .newValue()
+ .setString(Attachments.ID, messageAttachment.getAttachmentId().getId())
+ .setBool(Attachments.IS_INLINE, messageAttachment.isInline());
+ messageAttachment.getName()
+ .ifPresent(name -> result.setString(Attachments.NAME, name));
+ messageAttachment.getCid()
+ .ifPresent(cid -> result.setString(Attachments.CID, cid.getValue()));
+ return result;
+ }
+
public Mono<MessageRepresentation> retrieveMessage(ComposedMessageIdWithMetaData id, FetchType fetchType) {
CassandraMessageId cassandraMessageId = (CassandraMessageId) id.getComposedMessageId().getMessageId();
return retrieveMessage(cassandraMessageId, fetchType);
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
index 19fd945..06274e5 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
@@ -35,19 +35,19 @@ public class MessageRepresentation {
private final MessageId messageId;
private final Date internalDate;
private final Long size;
- private final Integer bodySize;
+ private final Integer bodyStartOctet;
private final SharedByteArrayInputStream content;
private final Properties properties;
private final List<MessageAttachmentRepresentation> attachments;
private final BlobId headerId;
private final BlobId bodyId;
- public MessageRepresentation(MessageId messageId, Date internalDate, Long size, Integer bodySize, SharedByteArrayInputStream content,
+ public MessageRepresentation(MessageId messageId, Date internalDate, Long size, Integer bodyStartOctet, SharedByteArrayInputStream content,
Properties properties, List<MessageAttachmentRepresentation> attachments, BlobId headerId, BlobId bodyId) {
this.messageId = messageId;
this.internalDate = internalDate;
this.size = size;
- this.bodySize = bodySize;
+ this.bodyStartOctet = bodyStartOctet;
this.content = content;
this.properties = properties;
this.attachments = attachments;
@@ -62,7 +62,7 @@ public class MessageRepresentation {
.uid(metadata.getComposedMessageId().getUid())
.modseq(metadata.getModSeq())
.internalDate(internalDate)
- .bodyStartOctet(bodySize)
+ .bodyStartOctet(bodyStartOctet)
.size(size)
.content(content)
.flags(metadata.getFlags())
@@ -71,6 +71,18 @@ public class MessageRepresentation {
.build();
}
+ public Date getInternalDate() {
+ return internalDate;
+ }
+
+ public Long getSize() {
+ return size;
+ }
+
+ public Integer getBodyStartOctet() {
+ return bodyStartOctet;
+ }
+
public MessageId getMessageId() {
return messageId;
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3Migration.java
new file mode 100644
index 0000000..2c37c4c
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3Migration.java
@@ -0,0 +1,119 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.migration;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Optional;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.migration.Migration;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV3;
+import org.apache.james.mailbox.cassandra.mail.MessageRepresentation;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+public class MessageV3Migration implements Migration {
+ private static final int CONCURRENCY = 50;
+
+ static class MessageV3MigrationTask implements Task {
+ private final MessageV3Migration migration;
+
+ MessageV3MigrationTask(MessageV3Migration migration) {
+ this.migration = migration;
+ }
+
+ @Override
+ public Result run() throws InterruptedException {
+ return migration.runTask();
+ }
+
+ @Override
+ public TaskType type() {
+ return TYPE;
+ }
+
+ @Override
+ public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+ return Optional.of(migration.getAdditionalInformation());
+ }
+ }
+
+ public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
+ private final Instant timestamp;
+
+ public AdditionalInformation(Instant timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return timestamp;
+ }
+ }
+
+ public static final Logger LOGGER = LoggerFactory.getLogger(MessageV3Migration.class);
+ public static final TaskType TYPE = TaskType.of("cassandra-message-v3-migration");
+ private final CassandraMessageDAO daoV2;
+ private final CassandraMessageDAOV3 daoV3;
+
+ @Inject
+ public MessageV3Migration(CassandraMessageDAO daoV2, CassandraMessageDAOV3 daoV3) {
+ this.daoV2 = daoV2;
+ this.daoV3 = daoV3;
+ }
+
+ @Override
+ public void apply() {
+ daoV2.list()
+ .flatMap(this::migrate, CONCURRENCY)
+ .doOnError(t -> LOGGER.error("Error while performing migration", t))
+ .blockLast();
+ }
+
+ private Mono<Void> migrate(MessageRepresentation messageRepresentation) {
+ return daoV3.save(messageRepresentation)
+ .then(daoV2.delete((CassandraMessageId) messageRepresentation.getMessageId()))
+ .onErrorResume(error -> handleErrorMigrate(messageRepresentation, error))
+ .then();
+ }
+
+ private Mono<Void> handleErrorMigrate(MessageRepresentation messageRepresentation, Throwable throwable) {
+ LOGGER.error("Error while performing migration for {}", messageRepresentation.getMessageId(), throwable);
+ return Mono.empty();
+ }
+
+ @Override
+ public Task asTask() {
+ return new MessageV3MigrationTask(this);
+ }
+
+ AdditionalInformation getAdditionalInformation() {
+ return new AdditionalInformation(Clock.systemUTC().instant());
+ }
+}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskAdditionalInformationDTO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskAdditionalInformationDTO.java
new file mode 100644
index 0000000..c5609cc
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskAdditionalInformationDTO.java
@@ -0,0 +1,69 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.migration;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class MessageV3MigrationTaskAdditionalInformationDTO implements AdditionalInformationDTO {
+ private static MessageV3MigrationTaskAdditionalInformationDTO fromDomainObject(MessageV3Migration.AdditionalInformation additionalInformation, String type) {
+ return new MessageV3MigrationTaskAdditionalInformationDTO(
+ type,
+ additionalInformation.timestamp()
+ );
+ }
+
+ public static final AdditionalInformationDTOModule<MessageV3Migration.AdditionalInformation, MessageV3MigrationTaskAdditionalInformationDTO> MODULE =
+ DTOModule
+ .forDomainObject(MessageV3Migration.AdditionalInformation.class)
+ .convertToDTO(MessageV3MigrationTaskAdditionalInformationDTO.class)
+ .toDomainObjectConverter(MessageV3MigrationTaskAdditionalInformationDTO::toDomainObject)
+ .toDTOConverter(MessageV3MigrationTaskAdditionalInformationDTO::fromDomainObject)
+ .typeName(MessageV3Migration.TYPE.asString())
+ .withFactory(AdditionalInformationDTOModule::new);
+
+ private final String type;
+ private final Instant timestamp;
+
+ public MessageV3MigrationTaskAdditionalInformationDTO(@JsonProperty("type") String type,
+ @JsonProperty("timestamp") Instant timestamp) {
+ this.type = type;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ private MessageV3Migration.AdditionalInformation toDomainObject() {
+ return new MessageV3Migration.AdditionalInformation(timestamp);
+ }
+}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskDTO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskDTO.java
new file mode 100644
index 0000000..87a1fec
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskDTO.java
@@ -0,0 +1,59 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.migration;
+
+import java.util.function.Function;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.TaskDTO;
+import org.apache.james.server.task.json.dto.TaskDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class MessageV3MigrationTaskDTO implements TaskDTO {
+
+ private static MessageV3MigrationTaskDTO fromDomainObject(MessageV3Migration.MessageV3MigrationTask task, String type) {
+ return new MessageV3MigrationTaskDTO(type);
+ }
+
+ public static final Function<MessageV3Migration, TaskDTOModule<MessageV3Migration.MessageV3MigrationTask, MessageV3MigrationTaskDTO>> MODULE = (migration) ->
+ DTOModule
+ .forDomainObject(MessageV3Migration.MessageV3MigrationTask.class)
+ .convertToDTO(MessageV3MigrationTaskDTO.class)
+ .toDomainObjectConverter(dto -> dto.toDomainObject(migration))
+ .toDTOConverter(MessageV3MigrationTaskDTO::fromDomainObject)
+ .typeName(MessageV3Migration.TYPE.asString())
+ .withFactory(TaskDTOModule::new);
+
+ private final String type;
+
+ public MessageV3MigrationTaskDTO(@JsonProperty("type") String type) {
+ this.type = type;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ private MessageV3Migration.MessageV3MigrationTask toDomainObject(MessageV3Migration migration) {
+ return new MessageV3Migration.MessageV3MigrationTask(migration);
+ }
+}
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
index bb52781..5f044db 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
@@ -94,6 +94,7 @@ class CassandraMessageDAOTest {
cassandra.getTypesProvider(),
blobStore,
blobIdFactory,
+ messageIdFactory,
cassandraCluster.getCassandraConsistenciesConfiguration());
messageIdWithMetadata = ComposedMessageIdWithMetaData.builder()
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskSerializationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskSerializationTest.java
new file mode 100644
index 0000000..1799543
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskSerializationTest.java
@@ -0,0 +1,52 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.migration;
+
+import static org.mockito.Mockito.mock;
+
+import java.time.Instant;
+
+import org.apache.james.JsonSerializationVerifier;
+import org.junit.jupiter.api.Test;
+
+class MessageV3MigrationTaskSerializationTest {
+ private static final Instant TIMESTAMP = Instant.parse("2018-11-13T12:00:55Z");
+ private static final MessageV3Migration MIGRATION = mock(MessageV3Migration.class);
+ private static final MessageV3Migration.MessageV3MigrationTask TASK = new MessageV3Migration.MessageV3MigrationTask(MIGRATION);
+ private static final String SERIALIZED_TASK = "{\"type\": \"cassandra-message-v3-migration\"}";
+ private static final MessageV3Migration.AdditionalInformation DETAILS = new MessageV3Migration.AdditionalInformation(TIMESTAMP);
+ private static final String SERIALIZED_ADDITIONAL_INFORMATION = "{\"type\": \"cassandra-message-v3-migration\", \"timestamp\":\"2018-11-13T12:00:55Z\"}";
+
+ @Test
+ void taskShouldBeSerializable() throws Exception {
+ JsonSerializationVerifier.dtoModule(MessageV3MigrationTaskDTO.MODULE.apply(MIGRATION))
+ .bean(TASK)
+ .json(SERIALIZED_TASK)
+ .verify();
+ }
+
+ @Test
+ void additionalInformationShouldBeSerializable() throws Exception {
+ JsonSerializationVerifier.dtoModule(MessageV3MigrationTaskAdditionalInformationDTO.MODULE)
+ .bean(DETAILS)
+ .json(SERIALIZED_ADDITIONAL_INFORMATION)
+ .verify();
+ }
+}
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTest.java
new file mode 100644
index 0000000..51d9d31
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTest.java
@@ -0,0 +1,157 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.migration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.List;
+
+import javax.mail.Flags;
+import javax.mail.util.SharedByteArrayInputStream;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.cassandra.CassandraBlobModule;
+import org.apache.james.blob.cassandra.CassandraBlobStoreFactory;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.cassandra.ids.CassandraId;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV3;
+import org.apache.james.mailbox.cassandra.mail.MessageRepresentation;
+import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule;
+import org.apache.james.mailbox.model.MessageAttachmentMetadata;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableList;
+
+class MessageV3MigrationTest {
+ private static final int BODY_START = 16;
+ private static final CassandraId MAILBOX_ID = CassandraId.timeBased();
+ private static final String CONTENT = "Subject: Test7 \n\nBody7\n.\n";
+ private static final MessageUid messageUid = MessageUid.of(1);
+ private static final List<MessageAttachmentMetadata> NO_ATTACHMENT = ImmutableList.of();
+
+ public static final CassandraModule MODULES = CassandraModule.aggregateModules(
+ CassandraMessageModule.MODULE,
+ CassandraBlobModule.MODULE,
+ CassandraSchemaVersionModule.MODULE);
+
+ @RegisterExtension
+ static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULES);
+
+ private CassandraMessageDAO daoV2;
+ private CassandraMessageDAOV3 daoV3;
+ private CassandraMessageId.Factory messageIdFactory;
+
+ @BeforeEach
+ void setUp(CassandraCluster cassandra) {
+ BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf())
+ .passthrough();
+ HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
+ daoV2 = new CassandraMessageDAO(
+ cassandra.getConf(),
+ cassandra.getTypesProvider(),
+ blobStore,
+ blobIdFactory,
+ new CassandraMessageId.Factory(),
+ cassandraCluster.getCassandraConsistenciesConfiguration());
+ daoV3 = new CassandraMessageDAOV3(
+ cassandra.getConf(),
+ cassandra.getTypesProvider(),
+ blobStore,
+ blobIdFactory,
+ cassandraCluster.getCassandraConsistenciesConfiguration());
+ messageIdFactory = new CassandraMessageId.Factory();
+ }
+
+ @Test
+ void migrationTaskShouldMoveDataToMostRecentDao() throws Exception{
+ SimpleMailboxMessage message1 = createMessage(messageIdFactory.generate());
+ SimpleMailboxMessage message2 = createMessage(messageIdFactory.generate());
+ SimpleMailboxMessage message3 = createMessage(messageIdFactory.generate());
+ SimpleMailboxMessage message4 = createMessage(messageIdFactory.generate());
+
+ daoV2.save(message1).block();
+ daoV2.save(message2).block();
+ daoV2.save(message3).block();
+ daoV2.save(message4).block();
+
+ new MessageV3Migration(daoV2, daoV3).apply();
+
+ SoftAssertions.assertSoftly(softly -> {
+ softly.assertThat(daoV3.retrieveMessage((CassandraMessageId) message1.getMessageId(), MessageMapper.FetchType.Metadata).block().getMessageId())
+ .isEqualTo(message1.getMessageId());
+ softly.assertThat(daoV3.retrieveMessage((CassandraMessageId) message2.getMessageId(), MessageMapper.FetchType.Metadata).block().getMessageId())
+ .isEqualTo(message2.getMessageId());
+ softly.assertThat(daoV3.retrieveMessage((CassandraMessageId) message3.getMessageId(), MessageMapper.FetchType.Metadata).block().getMessageId())
+ .isEqualTo(message3.getMessageId());
+ softly.assertThat(daoV3.retrieveMessage((CassandraMessageId) message4.getMessageId(), MessageMapper.FetchType.Metadata).block().getMessageId())
+ .isEqualTo(message4.getMessageId());
+
+ softly.assertThat(daoV2.list().collectList().block()).isEmpty();
+ });
+ }
+
+ @Test
+ void migrationTaskShouldPreserveMessageContent() throws Exception{
+ SimpleMailboxMessage message1 = createMessage(messageIdFactory.generate());
+ daoV2.save(message1).block();
+ MessageRepresentation original = daoV2.retrieveMessage((CassandraMessageId) message1.getMessageId(), MessageMapper.FetchType.Metadata).block();
+
+ new MessageV3Migration(daoV2, daoV3).apply();
+ MessageRepresentation migrated = daoV3.retrieveMessage((CassandraMessageId) message1.getMessageId(), MessageMapper.FetchType.Metadata).block();
+
+ int start = 0;
+ int end = -1;
+ assertThat(migrated).isEqualToComparingOnlyGivenFields(original, "messageId",
+ "internalDate", "size", "bodyStartOctet", "properties", "attachments", "headerId", "bodyId");
+ assertThat(migrated.getContent().newStream(start, end))
+ .hasSameContentAs(original.getContent().newStream(start, end));
+ }
+
+ private SimpleMailboxMessage createMessage(MessageId messageId) {
+ return SimpleMailboxMessage.builder()
+ .messageId(messageId)
+ .mailboxId(MAILBOX_ID)
+ .uid(messageUid)
+ .internalDate(new Date())
+ .bodyStartOctet(MessageV3MigrationTest.BODY_START)
+ .size(MessageV3MigrationTest.CONTENT.length())
+ .content(new SharedByteArrayInputStream(MessageV3MigrationTest.CONTENT.getBytes(StandardCharsets.UTF_8)))
+ .flags(new Flags())
+ .properties(new PropertyBuilder().build())
+ .addAttachments(NO_ATTACHMENT)
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/Properties.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/Properties.java
index 5754b71..6b63d07 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/Properties.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/Properties.java
@@ -41,6 +41,7 @@ import static org.apache.james.mailbox.store.mail.model.StandardNames.MIME_SUB_T
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -222,6 +223,22 @@ public class Properties {
return new ArrayList<>(properties);
}
+ @Override
+ public final boolean equals(Object o) {
+ if (o instanceof Properties) {
+ Properties that = (Properties) o;
+
+ return Objects.equals(this.textualLineCount, that.textualLineCount)
+ && Objects.equals(this.properties, that.properties);
+ }
+ return false;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hash(textualLineCount, properties);
+ }
+
/**
* Constructs a <code>String</code> with all attributes
* in name = value format.
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/webadmin/CassandraRoutesModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/webadmin/CassandraRoutesModule.java
index 90e8911..ff0f783 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/webadmin/CassandraRoutesModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/webadmin/CassandraRoutesModule.java
@@ -27,6 +27,7 @@ import org.apache.james.backends.cassandra.versions.SchemaTransition;
import org.apache.james.backends.cassandra.versions.SchemaVersion;
import org.apache.james.mailbox.cassandra.mail.migration.MailboxPathV2Migration;
import org.apache.james.mailbox.cassandra.mail.migration.MailboxPathV3Migration;
+import org.apache.james.mailbox.cassandra.mail.migration.MessageV3Migration;
import org.apache.james.rrt.cassandra.migration.MappingsSourcesMigration;
import org.apache.james.webadmin.Routes;
import org.apache.james.webadmin.routes.CassandraMailboxMergingRoutes;
@@ -42,6 +43,7 @@ public class CassandraRoutesModule extends AbstractModule {
private static final SchemaTransition FROM_V5_TO_V6 = SchemaTransition.to(new SchemaVersion(6));
private static final SchemaTransition FROM_V6_TO_V7 = SchemaTransition.to(new SchemaVersion(7));
private static final SchemaTransition FROM_V7_TO_V8 = SchemaTransition.to(new SchemaVersion(8));
+ private static final SchemaTransition FROM_V8_TO_V9 = SchemaTransition.to(new SchemaVersion(9));
@Override
protected void configure() {
@@ -60,6 +62,7 @@ public class CassandraRoutesModule extends AbstractModule {
allMigrationClazzBinder.addBinding(FROM_V5_TO_V6).to(MailboxPathV2Migration.class);
allMigrationClazzBinder.addBinding(FROM_V6_TO_V7).to(MappingsSourcesMigration.class);
allMigrationClazzBinder.addBinding(FROM_V7_TO_V8).to(MailboxPathV3Migration.class);
+ allMigrationClazzBinder.addBinding(FROM_V8_TO_V9).to(MessageV3Migration.class);
bind(SchemaVersion.class)
.annotatedWith(Names.named(CassandraMigrationService.LATEST_VERSION))
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org