You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2017/08/01 05:46:49 UTC

[1/2] james-project git commit: JAMES−2096 Add a fetch size to migration process V1 initial read

Repository: james-project
Updated Branches:
  refs/heads/master 9ed12378a -> ea823c645


JAMES−2096 Add a fetch size to migration process V1 initial read


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d640a485
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d640a485
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d640a485

Branch: refs/heads/master
Commit: d640a485b63889cf3beb3804e63a16e7699d3b72
Parents: 9ed1237
Author: benwa <bt...@linagora.com>
Authored: Mon Jul 31 11:21:49 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Tue Aug 1 12:45:20 2017 +0700

----------------------------------------------------------------------
 .../cassandra/CassandraConfiguration.java       | 30 +++++++++++++++++---
 .../cassandra/CassandraConfigurationTest.java   | 19 +++++++++++++
 .../cassandra/mail/CassandraMessageDAO.java     |  2 +-
 .../modules/mailbox/CassandraSessionModule.java |  3 ++
 .../mailbox/CassandraSessionModuleTest.java     |  1 +
 .../modules/mailbox/cassandra.properties        |  1 +
 src/site/xdoc/server/config-cassandra.xml       |  2 ++
 7 files changed, 53 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
index 5f0850b..5de74a4 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
@@ -42,6 +42,7 @@ public class CassandraConfiguration {
     public static final int DEFAULT_BLOB_PART_SIZE = 100 * 1024;
     public static final int DEFAULT_MIGRATION_V1_TO_V2_QUEUE_LENGTH = 1000;
     public static final int DEFAULT_MIGRATION_V1_TO_V2_THREAD_COUNT = 2;
+    public static final int DEFAULT_MIGRATION_V1_READ_FETCH_SIZE = 10;
 
     public static class Builder {
         private Optional<Integer> messageReadChunkSize = Optional.empty();
@@ -57,6 +58,7 @@ public class CassandraConfiguration {
         private Optional<Boolean> onTheFlyV1ToV2Migration = Optional.empty();
         private Optional<Integer> v1ToV2QueueLength = Optional.empty();
         private Optional<Integer> v1ToV2ThreadCount = Optional.empty();
+        private Optional<Integer> v1ReadFetchSize = Optional.empty();
 
         public Builder messageReadChunkSize(int value) {
             Preconditions.checkArgument(value > 0, "messageReadChunkSize needs to be strictly positive");
@@ -130,6 +132,12 @@ public class CassandraConfiguration {
             return this;
         }
 
+        public Builder v1ReadFetchSize(int value) {
+            Preconditions.checkArgument(value > 0, "v1ReadFetchSize needs to be strictly positive");
+            this.v1ReadFetchSize = Optional.of(value);
+            return this;
+        }
+
         public Builder onTheFlyV1ToV2Migration(boolean value) {
             this.onTheFlyV1ToV2Migration = Optional.of(value);
             return this;
@@ -200,6 +208,11 @@ public class CassandraConfiguration {
             return this;
         }
 
+        public Builder v1ReadFetchSize(Optional<Integer> value) {
+            value.ifPresent(this::v1ReadFetchSize);
+            return this;
+        }
+
         public CassandraConfiguration build() {
             return new CassandraConfiguration(aclMaxRetry.orElse(DEFAULT_ACL_MAX_RETRY),
                 messageReadChunkSize.orElse(DEFAULT_MESSAGE_CHUNK_SIZE_ON_READ),
@@ -213,7 +226,8 @@ public class CassandraConfiguration {
                 blobPartSize.orElse(DEFAULT_BLOB_PART_SIZE),
                 onTheFlyV1ToV2Migration.orElse(DEFAULT_ON_THE_FLY_MIGRATION_V1_TO_V2),
                 v1ToV2QueueLength.orElse(DEFAULT_MIGRATION_V1_TO_V2_QUEUE_LENGTH),
-                v1ToV2ThreadCount.orElse(DEFAULT_MIGRATION_V1_TO_V2_THREAD_COUNT));
+                v1ToV2ThreadCount.orElse(DEFAULT_MIGRATION_V1_TO_V2_THREAD_COUNT),
+                v1ReadFetchSize.orElse(DEFAULT_MIGRATION_V1_READ_FETCH_SIZE));
         }
     }
 
@@ -234,13 +248,14 @@ public class CassandraConfiguration {
     private final boolean onTheFlyV1ToV2Migration;
     private final int v1ToV2QueueLength;
     private final int v1ToV2ThreadCount;
+    private final int v1ReadFetchSize;
 
     @VisibleForTesting
     CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int expungeChunkSize,
                            int flagsUpdateChunkSize, int flagsUpdateMessageIdMaxRetry, int flagsUpdateMessageMaxRetry,
                            int modSeqMaxRetry, int uidMaxRetry, int fetchNextPageInAdvanceRow,
                            int blobPartSize, boolean onTheFlyV1ToV2Migration, int v1ToV2QueueLength,
-                           int v1ToV2ThreadCount
+                           int v1ToV2ThreadCount, int v1ReadFetchSize
     ) {
         this.aclMaxRetry = aclMaxRetry;
         this.messageReadChunkSize = messageReadChunkSize;
@@ -255,6 +270,7 @@ public class CassandraConfiguration {
         this.onTheFlyV1ToV2Migration = onTheFlyV1ToV2Migration;
         this.v1ToV2QueueLength = v1ToV2QueueLength;
         this.v1ToV2ThreadCount = v1ToV2ThreadCount;
+        this.v1ReadFetchSize = v1ReadFetchSize;
     }
 
     public int getBlobPartSize() {
@@ -309,6 +325,10 @@ public class CassandraConfiguration {
         return v1ToV2ThreadCount;
     }
 
+    public int getV1ReadFetchSize() {
+        return v1ReadFetchSize;
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (o instanceof CassandraConfiguration) {
@@ -326,7 +346,8 @@ public class CassandraConfiguration {
                 && Objects.equals(this.blobPartSize, that.blobPartSize)
                 && Objects.equals(this.onTheFlyV1ToV2Migration, that.onTheFlyV1ToV2Migration)
                 && Objects.equals(this.v1ToV2ThreadCount, that.v1ToV2ThreadCount)
-                && Objects.equals(this.v1ToV2QueueLength, that.v1ToV2QueueLength);
+                && Objects.equals(this.v1ToV2QueueLength, that.v1ToV2QueueLength)
+                && Objects.equals(this.v1ReadFetchSize, that.v1ReadFetchSize);
         }
         return false;
     }
@@ -335,7 +356,7 @@ public class CassandraConfiguration {
     public final int hashCode() {
         return Objects.hash(aclMaxRetry, messageReadChunkSize, expungeChunkSize, flagsUpdateMessageIdMaxRetry,
             flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, fetchNextPageInAdvanceRow, flagsUpdateChunkSize,
-            blobPartSize, onTheFlyV1ToV2Migration, v1ToV2ThreadCount, v1ToV2QueueLength);
+            blobPartSize, onTheFlyV1ToV2Migration, v1ToV2ThreadCount, v1ToV2QueueLength, v1ReadFetchSize);
     }
 
     @Override
@@ -354,6 +375,7 @@ public class CassandraConfiguration {
             .add("onTheFlyV1ToV2Migration", onTheFlyV1ToV2Migration)
             .add("v1ToV2ThreadCount", v1ToV2ThreadCount)
             .add("v1ToV2QueueLength", v1ToV2QueueLength)
+            .add("v1ReadFetchSize", v1ReadFetchSize)
             .toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
index e88d4a8..b38dbf8 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
@@ -224,6 +224,22 @@ public class CassandraConfigurationTest {
     }
 
     @Test
+    public void v1ReadFetchSizeShouldThrowOnNegative() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .v1ReadFetchSize(-1);
+    }
+
+    @Test
+    public void v1ReadFetchSizeShouldThrowOnZero() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .v1ToV2QueueLength(0);
+    }
+
+    @Test
     public void builderShouldCreateTheRightObject() {
         int aclMaxRetry = 1;
         int modSeqMaxRetry = 2;
@@ -238,6 +254,7 @@ public class CassandraConfigurationTest {
         boolean onTheFlyV1ToV2Migration = true;
         int v1ToV2ThreadCount = 11;
         int v1ToV2QueueLength = 12;
+        int v1ReadFetchSize = 13;
 
         CassandraConfiguration configuration = CassandraConfiguration.builder()
             .aclMaxRetry(aclMaxRetry)
@@ -253,6 +270,7 @@ public class CassandraConfigurationTest {
             .onTheFlyV1ToV2Migration(onTheFlyV1ToV2Migration)
             .v1ToV2ThreadCount(v1ToV2ThreadCount)
             .v1ToV2QueueLength(v1ToV2QueueLength)
+            .v1ReadFetchSize(v1ReadFetchSize)
             .build();
 
         softly.assertThat(configuration.getAclMaxRetry()).isEqualTo(aclMaxRetry);
@@ -268,6 +286,7 @@ public class CassandraConfigurationTest {
         softly.assertThat(configuration.isOnTheFlyV1ToV2Migration()).isEqualTo(onTheFlyV1ToV2Migration);
         softly.assertThat(configuration.getV1ToV2ThreadCount()).isEqualTo(v1ToV2ThreadCount);
         softly.assertThat(configuration.getV1ToV2QueueLength()).isEqualTo(v1ToV2QueueLength);
+        softly.assertThat(configuration.getV1ReadFetchSize()).isEqualTo(v1ReadFetchSize);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 78f6372..ba4802e 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -154,7 +154,7 @@ public class CassandraMessageDAO {
 
     public Stream<RawMessage> readAll() {
         return cassandraUtils.convertToStream(
-            cassandraAsyncExecutor.execute(selectAll.bind())
+            cassandraAsyncExecutor.execute(selectAll.bind().setFetchSize(cassandraConfiguration.getV1ReadFetchSize()))
                 .join())
             .map(this::fromRow);
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
index 12dc844..a9447f9 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
@@ -98,6 +98,7 @@ public class CassandraSessionModule extends AbstractModule {
     private static final String MIGRATION_V1_V2_ON_THE_FLY = "migration.v1.v2.on.the.fly";
     private static final String MIGRATION_V1_V2_THREAD_COUNT = "migration.v1.v2.thread.count";
     private static final String MIGRATION_V1_V2_QUEUE_LENGTH = "migration.v1.v2.queue.length";
+    public static final String MIGRATION_V1_READ_SIZE = "migration.v1.read.fetch.size";
 
     @Override
     protected void configure() {
@@ -303,6 +304,8 @@ public class CassandraSessionModule extends AbstractModule {
                 propertiesConfiguration.getInteger(MIGRATION_V1_V2_THREAD_COUNT, null)))
             .v1ToV2QueueLength(Optional.ofNullable(
                 propertiesConfiguration.getInteger(MIGRATION_V1_V2_QUEUE_LENGTH, null)))
+            .v1ReadFetchSize(Optional.ofNullable(
+                propertiesConfiguration.getInteger(MIGRATION_V1_READ_SIZE, null)))
             .build();
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
index 0ae927c..7ccc8a8 100644
--- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
+++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
@@ -63,6 +63,7 @@ public class CassandraSessionModuleTest {
                 .onTheFlyV1ToV2Migration(true)
                 .v1ToV2ThreadCount(11)
                 .v1ToV2QueueLength(12)
+                .v1ReadFetchSize(13)
                 .build());
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties b/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
index a625b28..54ede24 100644
--- a/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
+++ b/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
@@ -11,3 +11,4 @@ mailbox.blob.part.size=10
 migration.v1.v2.on.the.fly=true
 migration.v1.v2.thread.count=11
 migration.v1.v2.queue.length=12
+migration.v1.read.fetch.size=13

http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/src/site/xdoc/server/config-cassandra.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/server/config-cassandra.xml b/src/site/xdoc/server/config-cassandra.xml
index fab44ec..2534ceb 100644
--- a/src/site/xdoc/server/config-cassandra.xml
+++ b/src/site/xdoc/server/config-cassandra.xml
@@ -119,6 +119,8 @@
         <dd>Optional. Defaults to 2.<br/> Controls the number of threads used to asynchronously migrate from v1 to v2.</dd>
         <dt><strong>migration.v1.v2.queue.length</strong></dt>
         <dd>Optional. Defaults to 1000.<br/> Controls the queue size of v1 to v2 migration task. Drops when full.</dd>
+        <dt><strong>migration.v1.read.fetch.size</strong></dt>
+        <dd>Optional. Defaults to 10.<br/> Controls the fetch size of the request to retrieve all messages stored in V1 during the migration process.</dd>
       </dl>
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[2/2] james-project git commit: JAMES−2105 Correct on the fly migration when attachments

Posted by bt...@apache.org.
JAMES−2105 Correct on the fly migration when attachments


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ea823c64
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ea823c64
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ea823c64

Branch: refs/heads/master
Commit: ea823c645e99f2d355d863fc7593eb396eaea863
Parents: d640a48
Author: benwa <bt...@linagora.com>
Authored: Mon Jul 31 15:17:02 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Tue Aug 1 12:46:04 2017 +0700

----------------------------------------------------------------------
 .../mail/MessageAttachmentRepresentation.java   | 11 ++++++
 .../mail/migration/V1ToV2Migration.java         | 10 ++++-
 .../mail/migration/V1ToV2MigrationThread.java   |  9 +++--
 .../mail/migration/V1ToV2MigrationTest.java     | 41 ++++++++++++++++++++
 4 files changed, 65 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java
index 838ac56..172c550 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java
@@ -23,6 +23,8 @@ import java.util.Optional;
 
 import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.Cid;
+import org.apache.james.mailbox.model.MessageAttachment;
+import org.apache.james.util.OptionalConverter;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
@@ -35,6 +37,15 @@ public class MessageAttachmentRepresentation {
         return new Builder();
     }
 
+    public static MessageAttachmentRepresentation fromAttachment(MessageAttachment attachment) {
+        return builder()
+            .attachmentId(attachment.getAttachmentId())
+            .cid(OptionalConverter.fromGuava(attachment.getCid()))
+            .isInline(attachment.isInline())
+            .name(attachment.getName().orNull())
+            .build();
+    }
+
     public static class Builder {
 
         private AttachmentId attachmentId;

http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
index 7cb85bf..3d50f84 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail.migration;
 
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -44,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 
 public class V1ToV2Migration implements Migration {
@@ -55,7 +57,7 @@ public class V1ToV2Migration implements Migration {
     private final AttachmentLoader attachmentLoader;
     private final CassandraConfiguration cassandraConfiguration;
     private final ExecutorService migrationExecutor;
-    private final ArrayBlockingQueue<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated;
+    private final ArrayBlockingQueue<Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>>> messagesToBeMigrated;
 
     @Inject
     public V1ToV2Migration(CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2,
@@ -99,11 +101,15 @@ public class V1ToV2Migration implements Migration {
 
     private Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> submitMigration(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> messageV1) {
         if (cassandraConfiguration.isOnTheFlyV1ToV2Migration()) {
+            Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>> messageV1WithAttachmentCopied =
+                Pair.of(messageV1.getLeft(), messageV1.getRight().collect(Guavate.toImmutableList()));
             synchronized (messagesToBeMigrated) {
-                if (!messagesToBeMigrated.offer(messageV1)) {
+
+                if (!messagesToBeMigrated.offer(messageV1WithAttachmentCopied)) {
                     LOGGER.info("Migration queue is full message {} is ignored", messageV1.getLeft().getMessageId());
                 }
             }
+            return Pair.of(messageV1.getLeft(), messageV1WithAttachmentCopied.getRight().stream());
         }
         return messageV1;
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
index 1b96179..c65a521 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail.migration;
 
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -41,12 +42,12 @@ public class V1ToV2MigrationThread implements Runnable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(V1ToV2MigrationThread.class);
 
-    private final BlockingQueue<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated;
+    private final BlockingQueue<Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>>> messagesToBeMigrated;
     private final CassandraMessageDAO messageDAOV1;
     private final CassandraMessageDAOV2 messageDAOV2;
     private final AttachmentLoader attachmentLoader;
 
-    public V1ToV2MigrationThread(BlockingQueue<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated,
+    public V1ToV2MigrationThread(BlockingQueue<Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>>> messagesToBeMigrated,
                                  CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2, AttachmentLoader attachmentLoader) {
         this.messagesToBeMigrated = messagesToBeMigrated;
         this.messageDAOV1 = messageDAOV1;
@@ -58,8 +59,8 @@ public class V1ToV2MigrationThread implements Runnable {
     public void run() {
         while (true) {
             try {
-                Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message = messagesToBeMigrated.take();
-                performV1ToV2Migration(message).join();
+                Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>> message = messagesToBeMigrated.take();
+                performV1ToV2Migration(Pair.of(message.getLeft(), message.getRight().stream())).join();
             } catch (Exception e) {
                 LOGGER.error("Error occured in migration thread", e);
             }

http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
index 5825a92..8cce8fc 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
@@ -24,11 +24,14 @@ import java.util.Date;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.util.SharedByteArrayInputStream;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.backends.cassandra.init.CassandraModuleComposite;
@@ -40,6 +43,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraBlobsDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
 import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation;
+import org.apache.james.mailbox.cassandra.mail.MessageWithoutAttachment;
 import org.apache.james.mailbox.cassandra.mail.utils.Limit;
 import org.apache.james.mailbox.cassandra.modules.CassandraAttachmentModule;
 import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule;
@@ -61,6 +65,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableList;
 import com.jayway.awaitility.Awaitility;
@@ -236,6 +241,42 @@ public class V1ToV2MigrationTest {
             .build());
     }
 
+    @Test
+    public void migratedDataShouldBeRetrievedNoAttachment() throws Exception {
+        SimpleMailboxMessage originalMessage = createMessage(messageId, CONTENT, BODY_START,
+            new PropertyBuilder(), ImmutableList.of());
+
+        messageDAOV1.save(originalMessage).join();
+
+        Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> migratedData =
+            testee.getFromV2orElseFromV1AfterMigration(CassandraMessageDAOV2.notFound(metaData)).join();
+
+        awaitMigration();
+
+        softly.assertThat(migratedData.getLeft().getMessageId()).isEqualTo(messageId);
+        softly.assertThat(migratedData.getRight().collect(Guavate.toImmutableList()))
+            .isEmpty();
+    }
+
+    @Test
+    public void migratedDataShouldBeRetrievedWhenAttachment() throws Exception {
+        SimpleMailboxMessage originalMessage = createMessage(messageId, CONTENT, BODY_START,
+            new PropertyBuilder(), ImmutableList.of(messageAttachment));
+
+        attachmentMapper.storeAttachment(attachment);
+
+        messageDAOV1.save(originalMessage).join();
+
+        Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> migratedData =
+            testee.getFromV2orElseFromV1AfterMigration(CassandraMessageDAOV2.notFound(metaData)).join();
+
+        awaitMigration();
+
+        softly.assertThat(migratedData.getLeft().getMessageId()).isEqualTo(messageId);
+        softly.assertThat(migratedData.getRight().collect(Guavate.toImmutableList()))
+            .containsOnly(MessageAttachmentRepresentation.fromAttachment(messageAttachment));
+    }
+
     private void awaitMigration() {
         awaitability.atMost(1, TimeUnit.MINUTES)
             .until(() -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org