You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2017/08/01 05:46:49 UTC
[1/2] james-project git commit: JAMES−2096 Add a fetch size to migration process V1 initial read
Repository: james-project
Updated Branches:
refs/heads/master 9ed12378a -> ea823c645
JAMES−2096 Add a fetch size to migration process V1 initial read
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d640a485
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d640a485
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d640a485
Branch: refs/heads/master
Commit: d640a485b63889cf3beb3804e63a16e7699d3b72
Parents: 9ed1237
Author: benwa <bt...@linagora.com>
Authored: Mon Jul 31 11:21:49 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Tue Aug 1 12:45:20 2017 +0700
----------------------------------------------------------------------
.../cassandra/CassandraConfiguration.java | 30 +++++++++++++++++---
.../cassandra/CassandraConfigurationTest.java | 19 +++++++++++++
.../cassandra/mail/CassandraMessageDAO.java | 2 +-
.../modules/mailbox/CassandraSessionModule.java | 3 ++
.../mailbox/CassandraSessionModuleTest.java | 1 +
.../modules/mailbox/cassandra.properties | 1 +
src/site/xdoc/server/config-cassandra.xml | 2 ++
7 files changed, 53 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
index 5f0850b..5de74a4 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
@@ -42,6 +42,7 @@ public class CassandraConfiguration {
public static final int DEFAULT_BLOB_PART_SIZE = 100 * 1024;
public static final int DEFAULT_MIGRATION_V1_TO_V2_QUEUE_LENGTH = 1000;
public static final int DEFAULT_MIGRATION_V1_TO_V2_THREAD_COUNT = 2;
+ public static final int DEFAULT_MIGRATION_V1_READ_FETCH_SIZE = 10;
public static class Builder {
private Optional<Integer> messageReadChunkSize = Optional.empty();
@@ -57,6 +58,7 @@ public class CassandraConfiguration {
private Optional<Boolean> onTheFlyV1ToV2Migration = Optional.empty();
private Optional<Integer> v1ToV2QueueLength = Optional.empty();
private Optional<Integer> v1ToV2ThreadCount = Optional.empty();
+ private Optional<Integer> v1ReadFetchSize = Optional.empty();
public Builder messageReadChunkSize(int value) {
Preconditions.checkArgument(value > 0, "messageReadChunkSize needs to be strictly positive");
@@ -130,6 +132,12 @@ public class CassandraConfiguration {
return this;
}
+ public Builder v1ReadFetchSize(int value) {
+ Preconditions.checkArgument(value > 0, "v1ReadFetchSize needs to be strictly positive");
+ this.v1ReadFetchSize = Optional.of(value);
+ return this;
+ }
+
public Builder onTheFlyV1ToV2Migration(boolean value) {
this.onTheFlyV1ToV2Migration = Optional.of(value);
return this;
@@ -200,6 +208,11 @@ public class CassandraConfiguration {
return this;
}
+ public Builder v1ReadFetchSize(Optional<Integer> value) {
+ value.ifPresent(this::v1ReadFetchSize);
+ return this;
+ }
+
public CassandraConfiguration build() {
return new CassandraConfiguration(aclMaxRetry.orElse(DEFAULT_ACL_MAX_RETRY),
messageReadChunkSize.orElse(DEFAULT_MESSAGE_CHUNK_SIZE_ON_READ),
@@ -213,7 +226,8 @@ public class CassandraConfiguration {
blobPartSize.orElse(DEFAULT_BLOB_PART_SIZE),
onTheFlyV1ToV2Migration.orElse(DEFAULT_ON_THE_FLY_MIGRATION_V1_TO_V2),
v1ToV2QueueLength.orElse(DEFAULT_MIGRATION_V1_TO_V2_QUEUE_LENGTH),
- v1ToV2ThreadCount.orElse(DEFAULT_MIGRATION_V1_TO_V2_THREAD_COUNT));
+ v1ToV2ThreadCount.orElse(DEFAULT_MIGRATION_V1_TO_V2_THREAD_COUNT),
+ v1ReadFetchSize.orElse(DEFAULT_MIGRATION_V1_READ_FETCH_SIZE));
}
}
@@ -234,13 +248,14 @@ public class CassandraConfiguration {
private final boolean onTheFlyV1ToV2Migration;
private final int v1ToV2QueueLength;
private final int v1ToV2ThreadCount;
+ private final int v1ReadFetchSize;
@VisibleForTesting
CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int expungeChunkSize,
int flagsUpdateChunkSize, int flagsUpdateMessageIdMaxRetry, int flagsUpdateMessageMaxRetry,
int modSeqMaxRetry, int uidMaxRetry, int fetchNextPageInAdvanceRow,
int blobPartSize, boolean onTheFlyV1ToV2Migration, int v1ToV2QueueLength,
- int v1ToV2ThreadCount
+ int v1ToV2ThreadCount, int v1ReadFetchSize
) {
this.aclMaxRetry = aclMaxRetry;
this.messageReadChunkSize = messageReadChunkSize;
@@ -255,6 +270,7 @@ public class CassandraConfiguration {
this.onTheFlyV1ToV2Migration = onTheFlyV1ToV2Migration;
this.v1ToV2QueueLength = v1ToV2QueueLength;
this.v1ToV2ThreadCount = v1ToV2ThreadCount;
+ this.v1ReadFetchSize = v1ReadFetchSize;
}
public int getBlobPartSize() {
@@ -309,6 +325,10 @@ public class CassandraConfiguration {
return v1ToV2ThreadCount;
}
+ public int getV1ReadFetchSize() {
+ return v1ReadFetchSize;
+ }
+
@Override
public final boolean equals(Object o) {
if (o instanceof CassandraConfiguration) {
@@ -326,7 +346,8 @@ public class CassandraConfiguration {
&& Objects.equals(this.blobPartSize, that.blobPartSize)
&& Objects.equals(this.onTheFlyV1ToV2Migration, that.onTheFlyV1ToV2Migration)
&& Objects.equals(this.v1ToV2ThreadCount, that.v1ToV2ThreadCount)
- && Objects.equals(this.v1ToV2QueueLength, that.v1ToV2QueueLength);
+ && Objects.equals(this.v1ToV2QueueLength, that.v1ToV2QueueLength)
+ && Objects.equals(this.v1ReadFetchSize, that.v1ReadFetchSize);
}
return false;
}
@@ -335,7 +356,7 @@ public class CassandraConfiguration {
public final int hashCode() {
return Objects.hash(aclMaxRetry, messageReadChunkSize, expungeChunkSize, flagsUpdateMessageIdMaxRetry,
flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, fetchNextPageInAdvanceRow, flagsUpdateChunkSize,
- blobPartSize, onTheFlyV1ToV2Migration, v1ToV2ThreadCount, v1ToV2QueueLength);
+ blobPartSize, onTheFlyV1ToV2Migration, v1ToV2ThreadCount, v1ToV2QueueLength, v1ReadFetchSize);
}
@Override
@@ -354,6 +375,7 @@ public class CassandraConfiguration {
.add("onTheFlyV1ToV2Migration", onTheFlyV1ToV2Migration)
.add("v1ToV2ThreadCount", v1ToV2ThreadCount)
.add("v1ToV2QueueLength", v1ToV2QueueLength)
+ .add("v1ReadFetchSize", v1ReadFetchSize)
.toString();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
index e88d4a8..b38dbf8 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
@@ -224,6 +224,22 @@ public class CassandraConfigurationTest {
}
@Test
+ public void v1ReadFetchSizeShouldThrowOnNegative() {
+ expectedException.expect(IllegalArgumentException.class);
+
+ CassandraConfiguration.builder()
+ .v1ReadFetchSize(-1);
+ }
+
+ @Test
+ public void v1ReadFetchSizeShouldThrowOnZero() {
+ expectedException.expect(IllegalArgumentException.class);
+
+ CassandraConfiguration.builder()
+ .v1ToV2QueueLength(0);
+ }
+
+ @Test
public void builderShouldCreateTheRightObject() {
int aclMaxRetry = 1;
int modSeqMaxRetry = 2;
@@ -238,6 +254,7 @@ public class CassandraConfigurationTest {
boolean onTheFlyV1ToV2Migration = true;
int v1ToV2ThreadCount = 11;
int v1ToV2QueueLength = 12;
+ int v1ReadFetchSize = 13;
CassandraConfiguration configuration = CassandraConfiguration.builder()
.aclMaxRetry(aclMaxRetry)
@@ -253,6 +270,7 @@ public class CassandraConfigurationTest {
.onTheFlyV1ToV2Migration(onTheFlyV1ToV2Migration)
.v1ToV2ThreadCount(v1ToV2ThreadCount)
.v1ToV2QueueLength(v1ToV2QueueLength)
+ .v1ReadFetchSize(v1ReadFetchSize)
.build();
softly.assertThat(configuration.getAclMaxRetry()).isEqualTo(aclMaxRetry);
@@ -268,6 +286,7 @@ public class CassandraConfigurationTest {
softly.assertThat(configuration.isOnTheFlyV1ToV2Migration()).isEqualTo(onTheFlyV1ToV2Migration);
softly.assertThat(configuration.getV1ToV2ThreadCount()).isEqualTo(v1ToV2ThreadCount);
softly.assertThat(configuration.getV1ToV2QueueLength()).isEqualTo(v1ToV2QueueLength);
+ softly.assertThat(configuration.getV1ReadFetchSize()).isEqualTo(v1ReadFetchSize);
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 78f6372..ba4802e 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -154,7 +154,7 @@ public class CassandraMessageDAO {
public Stream<RawMessage> readAll() {
return cassandraUtils.convertToStream(
- cassandraAsyncExecutor.execute(selectAll.bind())
+ cassandraAsyncExecutor.execute(selectAll.bind().setFetchSize(cassandraConfiguration.getV1ReadFetchSize()))
.join())
.map(this::fromRow);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
index 12dc844..a9447f9 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
@@ -98,6 +98,7 @@ public class CassandraSessionModule extends AbstractModule {
private static final String MIGRATION_V1_V2_ON_THE_FLY = "migration.v1.v2.on.the.fly";
private static final String MIGRATION_V1_V2_THREAD_COUNT = "migration.v1.v2.thread.count";
private static final String MIGRATION_V1_V2_QUEUE_LENGTH = "migration.v1.v2.queue.length";
+ public static final String MIGRATION_V1_READ_SIZE = "migration.v1.read.fetch.size";
@Override
protected void configure() {
@@ -303,6 +304,8 @@ public class CassandraSessionModule extends AbstractModule {
propertiesConfiguration.getInteger(MIGRATION_V1_V2_THREAD_COUNT, null)))
.v1ToV2QueueLength(Optional.ofNullable(
propertiesConfiguration.getInteger(MIGRATION_V1_V2_QUEUE_LENGTH, null)))
+ .v1ReadFetchSize(Optional.ofNullable(
+ propertiesConfiguration.getInteger(MIGRATION_V1_READ_SIZE, null)))
.build();
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
index 0ae927c..7ccc8a8 100644
--- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
+++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
@@ -63,6 +63,7 @@ public class CassandraSessionModuleTest {
.onTheFlyV1ToV2Migration(true)
.v1ToV2ThreadCount(11)
.v1ToV2QueueLength(12)
+ .v1ReadFetchSize(13)
.build());
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties b/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
index a625b28..54ede24 100644
--- a/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
+++ b/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
@@ -11,3 +11,4 @@ mailbox.blob.part.size=10
migration.v1.v2.on.the.fly=true
migration.v1.v2.thread.count=11
migration.v1.v2.queue.length=12
+migration.v1.read.fetch.size=13
http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/src/site/xdoc/server/config-cassandra.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/server/config-cassandra.xml b/src/site/xdoc/server/config-cassandra.xml
index fab44ec..2534ceb 100644
--- a/src/site/xdoc/server/config-cassandra.xml
+++ b/src/site/xdoc/server/config-cassandra.xml
@@ -119,6 +119,8 @@
<dd>Optional. Defaults to 2.<br/> Controls the number of threads used to asynchronously migrate from v1 to v2.</dd>
<dt><strong>migration.v1.v2.queue.length</strong></dt>
<dd>Optional. Defaults to 1000.<br/> Controls the queue size of v1 to v2 migration task. Drops when full.</dd>
+ <dt><strong>migration.v1.read.fetch.size</strong></dt>
+ <dd>Optional. Defaults to 10.<br/> Controls the fetch size of the request to retrieve all messages stored in V1 during the migration process.</dd>
</dl>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[2/2] james-project git commit: JAMES−2105 Correct on the fly migration when attachments
Posted by bt...@apache.org.
JAMES−2105 Correct on the fly migration when attachments
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ea823c64
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ea823c64
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ea823c64
Branch: refs/heads/master
Commit: ea823c645e99f2d355d863fc7593eb396eaea863
Parents: d640a48
Author: benwa <bt...@linagora.com>
Authored: Mon Jul 31 15:17:02 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Tue Aug 1 12:46:04 2017 +0700
----------------------------------------------------------------------
.../mail/MessageAttachmentRepresentation.java | 11 ++++++
.../mail/migration/V1ToV2Migration.java | 10 ++++-
.../mail/migration/V1ToV2MigrationThread.java | 9 +++--
.../mail/migration/V1ToV2MigrationTest.java | 41 ++++++++++++++++++++
4 files changed, 65 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java
index 838ac56..172c550 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java
@@ -23,6 +23,8 @@ import java.util.Optional;
import org.apache.james.mailbox.model.AttachmentId;
import org.apache.james.mailbox.model.Cid;
+import org.apache.james.mailbox.model.MessageAttachment;
+import org.apache.james.util.OptionalConverter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
@@ -35,6 +37,15 @@ public class MessageAttachmentRepresentation {
return new Builder();
}
+ public static MessageAttachmentRepresentation fromAttachment(MessageAttachment attachment) {
+ return builder()
+ .attachmentId(attachment.getAttachmentId())
+ .cid(OptionalConverter.fromGuava(attachment.getCid()))
+ .isInline(attachment.isInline())
+ .name(attachment.getName().orNull())
+ .build();
+ }
+
public static class Builder {
private AttachmentId attachmentId;
http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
index 7cb85bf..3d50f84 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
@@ -19,6 +19,7 @@
package org.apache.james.mailbox.cassandra.mail.migration;
+import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -44,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
public class V1ToV2Migration implements Migration {
@@ -55,7 +57,7 @@ public class V1ToV2Migration implements Migration {
private final AttachmentLoader attachmentLoader;
private final CassandraConfiguration cassandraConfiguration;
private final ExecutorService migrationExecutor;
- private final ArrayBlockingQueue<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated;
+ private final ArrayBlockingQueue<Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>>> messagesToBeMigrated;
@Inject
public V1ToV2Migration(CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2,
@@ -99,11 +101,15 @@ public class V1ToV2Migration implements Migration {
private Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> submitMigration(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> messageV1) {
if (cassandraConfiguration.isOnTheFlyV1ToV2Migration()) {
+ Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>> messageV1WithAttachmentCopied =
+ Pair.of(messageV1.getLeft(), messageV1.getRight().collect(Guavate.toImmutableList()));
synchronized (messagesToBeMigrated) {
- if (!messagesToBeMigrated.offer(messageV1)) {
+
+ if (!messagesToBeMigrated.offer(messageV1WithAttachmentCopied)) {
LOGGER.info("Migration queue is full message {} is ignored", messageV1.getLeft().getMessageId());
}
}
+ return Pair.of(messageV1.getLeft(), messageV1WithAttachmentCopied.getRight().stream());
}
return messageV1;
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
index 1b96179..c65a521 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
@@ -19,6 +19,7 @@
package org.apache.james.mailbox.cassandra.mail.migration;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
@@ -41,12 +42,12 @@ public class V1ToV2MigrationThread implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(V1ToV2MigrationThread.class);
- private final BlockingQueue<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated;
+ private final BlockingQueue<Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>>> messagesToBeMigrated;
private final CassandraMessageDAO messageDAOV1;
private final CassandraMessageDAOV2 messageDAOV2;
private final AttachmentLoader attachmentLoader;
- public V1ToV2MigrationThread(BlockingQueue<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated,
+ public V1ToV2MigrationThread(BlockingQueue<Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>>> messagesToBeMigrated,
CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2, AttachmentLoader attachmentLoader) {
this.messagesToBeMigrated = messagesToBeMigrated;
this.messageDAOV1 = messageDAOV1;
@@ -58,8 +59,8 @@ public class V1ToV2MigrationThread implements Runnable {
public void run() {
while (true) {
try {
- Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message = messagesToBeMigrated.take();
- performV1ToV2Migration(message).join();
+ Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>> message = messagesToBeMigrated.take();
+ performV1ToV2Migration(Pair.of(message.getLeft(), message.getRight().stream())).join();
} catch (Exception e) {
LOGGER.error("Error occured in migration thread", e);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
index 5825a92..8cce8fc 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
@@ -24,11 +24,14 @@ import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.mail.Flags;
import javax.mail.util.SharedByteArrayInputStream;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraConfiguration;
import org.apache.james.backends.cassandra.init.CassandraModuleComposite;
@@ -40,6 +43,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraBlobsDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation;
+import org.apache.james.mailbox.cassandra.mail.MessageWithoutAttachment;
import org.apache.james.mailbox.cassandra.mail.utils.Limit;
import org.apache.james.mailbox.cassandra.modules.CassandraAttachmentModule;
import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule;
@@ -61,6 +65,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import com.github.steveash.guavate.Guavate;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.jayway.awaitility.Awaitility;
@@ -236,6 +241,42 @@ public class V1ToV2MigrationTest {
.build());
}
+ @Test
+ public void migratedDataShouldBeRetrievedNoAttachment() throws Exception {
+ SimpleMailboxMessage originalMessage = createMessage(messageId, CONTENT, BODY_START,
+ new PropertyBuilder(), ImmutableList.of());
+
+ messageDAOV1.save(originalMessage).join();
+
+ Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> migratedData =
+ testee.getFromV2orElseFromV1AfterMigration(CassandraMessageDAOV2.notFound(metaData)).join();
+
+ awaitMigration();
+
+ softly.assertThat(migratedData.getLeft().getMessageId()).isEqualTo(messageId);
+ softly.assertThat(migratedData.getRight().collect(Guavate.toImmutableList()))
+ .isEmpty();
+ }
+
+ @Test
+ public void migratedDataShouldBeRetrievedWhenAttachment() throws Exception {
+ SimpleMailboxMessage originalMessage = createMessage(messageId, CONTENT, BODY_START,
+ new PropertyBuilder(), ImmutableList.of(messageAttachment));
+
+ attachmentMapper.storeAttachment(attachment);
+
+ messageDAOV1.save(originalMessage).join();
+
+ Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> migratedData =
+ testee.getFromV2orElseFromV1AfterMigration(CassandraMessageDAOV2.notFound(metaData)).join();
+
+ awaitMigration();
+
+ softly.assertThat(migratedData.getLeft().getMessageId()).isEqualTo(messageId);
+ softly.assertThat(migratedData.getRight().collect(Guavate.toImmutableList()))
+ .containsOnly(MessageAttachmentRepresentation.fromAttachment(messageAttachment));
+ }
+
private void awaitMigration() {
awaitability.atMost(1, TimeUnit.MINUTES)
.until(() -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org