You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/01/04 08:13:25 UTC

[10/21] james-project git commit: JAMES-2272 Cassandra migrations should rely on TaskManager

JAMES-2272 Cassandra migrations should rely on TaskManager

 - Use MigrationTask to add context to migration tasks
 - This makes migration manageable
 - Concurrency concerns are pushed to the TaskManager (which has sequential task execution) thus related tests can be removed. Note that an integration test demonstrate no WebAdmin behaviour regression was introduced on that point.

In order to achieve this result, CassandraMigrationService has to generate a Migration that is a combination of required migrations.


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b1a7139b
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b1a7139b
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b1a7139b

Branch: refs/heads/master
Commit: b1a7139bbb9f720faa12bb00d52c7d71ce7b6e11
Parents: d9d73f8
Author: benwa <bt...@linagora.com>
Authored: Wed Dec 27 13:41:23 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Thu Jan 4 15:03:36 2018 +0700

----------------------------------------------------------------------
 .../migration/CassandraMigrationService.java    | 75 +++++++++++------
 .../backends/cassandra/migration/Migration.java | 10 +++
 .../cassandra/migration/MigrationTask.java      | 61 ++++++++++++++
 .../CassandraMigrationServiceTest.java          | 61 +++-----------
 .../cassandra/migration/MigrationTest.java      | 89 ++++++++++++++++++++
 .../migration/AttachmentMessageIdCreation.java  |  3 +-
 .../mail/migration/AttachmentV2Migration.java   |  3 +-
 .../WebAdminServerIntegrationTest.java          | 66 ++++++++++++---
 .../routes/CassandraMigrationRoutes.java        | 48 +++++------
 .../routes/CassandraMigrationRoutesTest.java    | 80 +++++++++++++++---
 .../apache/james/webadmin/dto/TaskIdDto.java    | 13 +++
 11 files changed, 379 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/b1a7139b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java
index 0eb80aa..9c494e1 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.backends.cassandra.migration;
 
+import static org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager.DEFAULT_VERSION;
+
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.IntStream;
@@ -28,7 +30,6 @@ import javax.inject.Named;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
-import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,40 +58,62 @@ public class CassandraMigrationService {
         return Optional.of(latestVersion);
     }
 
-    public synchronized void upgradeToVersion(int newVersion) {
-        int currentVersion = schemaVersionDAO.getCurrentSchemaVersion().join().orElse(CassandraSchemaVersionManager.DEFAULT_VERSION);
-        if (currentVersion >= newVersion) {
-            throw new IllegalStateException("Current version is already up to date");
-        }
+    public Migration upgradeToVersion(int newVersion) {
+        int currentVersion = getCurrentVersion().orElse(DEFAULT_VERSION);
+        assertMigrationNeeded(newVersion, currentVersion);
 
-        IntStream.range(currentVersion, newVersion)
+        Migration migrationCombination = IntStream.range(currentVersion, newVersion)
             .boxed()
-            .forEach(this::doMigration);
+            .map(this::validateVersionNumber)
+            .map(this::toMigration)
+            .reduce(Migration.IDENTITY, Migration::combine);
+        return new MigrationTask(migrationCombination, newVersion);
     }
 
-    public void upgradeToLastVersion() {
-        upgradeToVersion(latestVersion);
+    private void assertMigrationNeeded(int newVersion, int currentVersion) {
+        boolean needMigration = currentVersion < newVersion;
+        if (!needMigration) {
+            throw new IllegalStateException("Current version is already up to date");
+        }
     }
 
-    private void doMigration(Integer version) {
-        if (allMigrationClazz.containsKey(version)) {
-            LOG.info("Migrating to version {} ", version + 1);
-            Migration.Result migrationResult = allMigrationClazz.get(version).run();
-            if (migrationResult == Migration.Result.COMPLETED) {
-                schemaVersionDAO.updateVersion(version + 1);
-                LOG.info("Migrating to version {} done", version + 1);
-            } else {
-                String message = String.format("Migrating to version %d partially done. " +
-                    "Please check logs for cause of failure and re-run this migration.",
-                    version + 1);
-                LOG.warn(message);
-                throw new MigrationException(message);
-            }
-        } else {
-            String message = String.format("Can not migrate to %d. No migration class registered.", version + 1);
+    private Integer validateVersionNumber(Integer versionNumber) {
+        if (!allMigrationClazz.containsKey(versionNumber)) {
+            String message = String.format("Can not migrate to %d. No migration class registered.", versionNumber);
             LOG.error(message);
             throw new NotImplementedException(message);
         }
+        return versionNumber;
+    }
+
+    public Migration upgradeToLastVersion() {
+        return upgradeToVersion(latestVersion);
+    }
+
+    private Migration toMigration(Integer version) {
+        return () -> {
+            int newVersion = version + 1;
+            int currentVersion = getCurrentVersion().orElse(DEFAULT_VERSION);
+            if (currentVersion >= newVersion) {
+                return Migration.Result.PARTIAL;
+            }
+
+            LOG.info("Migrating to version {} ", newVersion);
+            return allMigrationClazz.get(version).run()
+                .onComplete(() -> schemaVersionDAO.updateVersion(newVersion),
+                    () -> LOG.info("Migrating to version {} done", newVersion))
+                .onFailure(() -> LOG.warn(failureMessage(newVersion)),
+                    () -> throwMigrationException(newVersion));
+        };
+    }
+
+    private void throwMigrationException(int newVersion) {
+        throw new MigrationException(failureMessage(newVersion));
+    }
+
+    private String failureMessage(Integer newVersion) {
+        return String.format("Migrating to version %d partially done. " +
+                "Please check logs for cause of failure and re-run this migration.", newVersion);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1a7139b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/Migration.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/Migration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/Migration.java
index b4f15f4..18be8dc 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/Migration.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/Migration.java
@@ -22,5 +22,15 @@ package org.apache.james.backends.cassandra.migration;
 import org.apache.james.task.Task;
 
 public interface Migration extends Task {
+    Migration IDENTITY = () -> Result.COMPLETED;
 
+    static Migration combine(Migration migration1, Migration migration2) {
+        return () -> {
+            Result migration1Result = migration1.run();
+            if (migration1Result == Result.COMPLETED) {
+                return migration2.run();
+            }
+            return Result.PARTIAL;
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1a7139b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java
new file mode 100644
index 0000000..518b682
--- /dev/null
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationTask.java
@@ -0,0 +1,61 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra.migration;
+
+import java.util.Optional;
+
+public class MigrationTask implements Migration {
+    public static final String CASSANDRA_MIGRATION = "CassandraMigration";
+
+    public static class Details {
+        private final int toVersion;
+
+        public Details(int toVersion) {
+            this.toVersion = toVersion;
+        }
+
+        public int getToVersion() {
+            return toVersion;
+        }
+    }
+
+    private final Migration migration;
+    private final int toVersion;
+
+    public MigrationTask(Migration migration, int toVersion) {
+        this.migration = migration;
+        this.toVersion = toVersion;
+    }
+
+    @Override
+    public Result run() {
+        return migration.run();
+    }
+
+    @Override
+    public String type() {
+        return CASSANDRA_MIGRATION;
+    }
+
+    @Override
+    public Optional<Object> details() {
+        return Optional.of(new Details(toVersion));
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1a7139b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
index fa2a29f..5dea96b 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
@@ -21,7 +21,6 @@ package org.apache.james.backends.cassandra.migration;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -34,8 +33,6 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
@@ -46,12 +43,12 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import com.datastax.driver.core.Session;
-import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 
 public class CassandraMigrationServiceTest {
     private static final int LATEST_VERSION = 3;
-    private static final int CURRENT_VERSION = 2;
+    private static final int INTERMEDIARY_VERSION = 2;
+    private static final int CURRENT_VERSION = INTERMEDIARY_VERSION;
     private static final int OLDER_VERSION = 1;
     private CassandraMigrationService testee;
     private CassandraSchemaVersionDAO schemaVersionDAO;
@@ -98,14 +95,14 @@ public class CassandraMigrationServiceTest {
 
         when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION)));
 
-        testee.upgradeToVersion(OLDER_VERSION);
+        testee.upgradeToVersion(OLDER_VERSION).run();
     }
 
     @Test
     public void upgradeToVersionShouldUpdateToVersion() throws Exception {
         when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
 
-        testee.upgradeToVersion(CURRENT_VERSION);
+        testee.upgradeToVersion(CURRENT_VERSION).run();
 
         verify(schemaVersionDAO, times(1)).updateVersion(eq(CURRENT_VERSION));
     }
@@ -116,14 +113,14 @@ public class CassandraMigrationServiceTest {
 
         when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION)));
 
-        testee.upgradeToLastVersion();
+        testee.upgradeToLastVersion().run();
     }
 
     @Test
     public void upgradeToLastVersionShouldUpdateToLatestVersion() throws Exception {
         when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
 
-        testee.upgradeToLastVersion();
+        testee.upgradeToLastVersion().run();
 
         verify(schemaVersionDAO, times(1)).updateVersion(eq(LATEST_VERSION));
     }
@@ -139,7 +136,7 @@ public class CassandraMigrationServiceTest {
 
         expectedException.expect(NotImplementedException.class);
 
-        testee.upgradeToVersion(LATEST_VERSION);
+        testee.upgradeToVersion(LATEST_VERSION).run();
     }
 
     @Test
@@ -147,6 +144,7 @@ public class CassandraMigrationServiceTest {
         try {
             Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
                 .put(OLDER_VERSION, successfulMigration)
+                .put(INTERMEDIARY_VERSION, () -> Migration.Result.PARTIAL)
                 .put(LATEST_VERSION, successfulMigration)
                 .build();
             testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
@@ -154,50 +152,13 @@ public class CassandraMigrationServiceTest {
 
             expectedException.expect(RuntimeException.class);
 
-            testee.upgradeToVersion(LATEST_VERSION);
+            testee.upgradeToVersion(LATEST_VERSION).run();
         } finally {
             verify(schemaVersionDAO).updateVersion(CURRENT_VERSION);
         }
     }
 
     @Test
-    public void concurrentMigrationsShouldFail() throws Exception {
-        // Given a stateful migration service
-        Migration wait1SecondMigration = mock(Migration.class);
-        doAnswer(invocation -> {
-            Thread.sleep(1000);
-            return Migration.Result.COMPLETED;
-        }).when(wait1SecondMigration).run();
-        Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
-            .put(OLDER_VERSION, wait1SecondMigration)
-            .put(CURRENT_VERSION, wait1SecondMigration)
-            .put(LATEST_VERSION, wait1SecondMigration)
-            .build();
-        testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION);
-
-        // When I perform a concurrent migration
-        AtomicInteger encounteredExceptionCount = new AtomicInteger(0);
-        executorService.submit(() -> testee.upgradeToVersion(LATEST_VERSION));
-        executorService.submit(() -> {
-            try {
-                Thread.sleep(500);
-            } catch (InterruptedException e) {
-                throw Throwables.propagate(e);
-            }
-
-            try {
-                testee.upgradeToVersion(LATEST_VERSION);
-            } catch (IllegalStateException e) {
-                encounteredExceptionCount.incrementAndGet();
-            }
-        });
-        executorService.awaitTermination(10, TimeUnit.SECONDS);
-
-        // Then the second migration fails
-        assertThat(encounteredExceptionCount.get()).isEqualTo(1);
-    }
-
-    @Test
     public void partialMigrationShouldThrow() throws Exception {
         Migration migration1 = mock(Migration.class);
         when(migration1.run()).thenReturn(Migration.Result.PARTIAL);
@@ -211,7 +172,7 @@ public class CassandraMigrationServiceTest {
 
         expectedException.expect(MigrationException.class);
 
-        testee.upgradeToVersion(LATEST_VERSION);
+        testee.upgradeToVersion(LATEST_VERSION).run();
     }
 
     @Test
@@ -230,7 +191,7 @@ public class CassandraMigrationServiceTest {
         expectedException.expect(MigrationException.class);
 
         try {
-            testee.upgradeToVersion(LATEST_VERSION);
+            testee.upgradeToVersion(LATEST_VERSION).run();
         } finally {
             verify(migration1, times(1)).run();
             verifyNoMoreInteractions(migration1);

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1a7139b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/MigrationTest.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/MigrationTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/MigrationTest.java
new file mode 100644
index 0000000..f1a0092
--- /dev/null
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/MigrationTest.java
@@ -0,0 +1,89 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra.migration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+public class MigrationTest {
+    @Test
+    public void combineShouldNotExecuteSecondMigrationExecutionWhenTheFirstOneIsFailing() {
+        AtomicBoolean migration2Done = new AtomicBoolean(false);
+
+        Migration migration1 = () -> Migration.Result.PARTIAL;
+        Migration migration2 = () -> {
+            migration2Done.set(true);
+            return Migration.Result.COMPLETED;
+        };
+
+        Migration.combine(migration1, migration2).run();
+
+        assertThat(migration2Done).isFalse();
+    }
+
+    @Test
+    public void combineShouldTriggerSecondMigrationWhenTheFirstOneSucceed() {
+        AtomicBoolean migration2Done = new AtomicBoolean(false);
+
+        Migration migration1 = () -> Migration.Result.COMPLETED;
+        Migration migration2 = () -> {
+            migration2Done.set(true);
+            return Migration.Result.COMPLETED;
+        };
+
+        Migration.combine(migration1, migration2).run();
+
+        assertThat(migration2Done).isTrue();
+    }
+
+    @Test
+    public void combineShouldExecuteTheFirstMigrationWhenSecondWillFail() {
+        AtomicBoolean migration1Done = new AtomicBoolean(false);
+
+        Migration migration1 = () -> {
+            migration1Done.set(true);
+            return Migration.Result.COMPLETED;
+        };
+        Migration migration2 = () -> Migration.Result.PARTIAL;
+
+
+        Migration.combine(migration1, migration2).run();
+
+        assertThat(migration1Done).isTrue();
+    }
+
+    @Test
+    public void combineShouldExecuteTheFirstMigration() {
+        AtomicBoolean migration1Done = new AtomicBoolean(false);
+
+        Migration migration1 = () -> {
+            migration1Done.set(true);
+            return Migration.Result.COMPLETED;
+        };
+        Migration migration2 = () -> Migration.Result.COMPLETED;
+
+        Migration.combine(migration1, migration2).run();
+
+        assertThat(migration1Done).isTrue();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1a7139b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
index 37e8358..d43c8a6 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
@@ -25,6 +25,7 @@ import org.apache.james.backends.cassandra.migration.Migration;
 import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMessageIdDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO.MessageIdAttachmentIds;
+import org.apache.james.task.Task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +47,7 @@ public class AttachmentMessageIdCreation implements Migration {
             return cassandraMessageDAO.retrieveAllMessageIdAttachmentIds()
                 .join()
                 .map(this::createIndex)
-                .reduce(Result.COMPLETED, Migration::combine);
+                .reduce(Result.COMPLETED, Task::combine);
         } catch (Exception e) {
             LOGGER.error("Error while creation attachmentId -> messageIds index", e);
             return Result.PARTIAL;

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1a7139b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
index ca5c116..0d33e36 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
@@ -26,6 +26,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAOV2;
 import org.apache.james.mailbox.cassandra.mail.CassandraBlobsDAO;
 import org.apache.james.mailbox.model.Attachment;
+import org.apache.james.task.Task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +50,7 @@ public class AttachmentV2Migration implements Migration {
         try {
             return attachmentDAOV1.retrieveAll()
                 .map(this::migrateAttachment)
-                .reduce(Result.COMPLETED, Migration::combine);
+                .reduce(Result.COMPLETED, Task::combine);
         } catch (Exception e) {
             LOGGER.error("Error while performing attachmentDAO V2 migration", e);
             return Result.PARTIAL;

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1a7139b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationTest.java b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationTest.java
index 575bd30..981fbf5 100644
--- a/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationTest.java
@@ -20,6 +20,7 @@
 package org.apache.james.webadmin.integration;
 
 import static com.jayway.restassured.RestAssured.given;
+import static com.jayway.restassured.RestAssured.with;
 import static com.jayway.restassured.config.EncoderConfig.encoderConfig;
 import static com.jayway.restassured.config.RestAssuredConfig.newConfig;
 import static org.apache.james.webadmin.Constants.JSON_CONTENT_TYPE;
@@ -30,6 +31,8 @@ import static org.hamcrest.Matchers.is;
 
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.james.CassandraJmapTestRule;
 import org.apache.james.DockerCassandraRule;
@@ -37,6 +40,8 @@ import org.apache.james.GuiceJamesServer;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.modules.MailboxProbeImpl;
 import org.apache.james.probe.DataProbe;
+import org.apache.james.task.TaskManager;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.apache.james.utils.DataProbeImpl;
 import org.apache.james.utils.WebAdminGuiceProbe;
 import org.apache.james.webadmin.routes.DomainsRoutes;
@@ -49,6 +54,7 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
 
 import com.jayway.restassured.RestAssured;
 import com.jayway.restassured.builder.RequestSpecBuilder;
@@ -226,13 +232,16 @@ public class WebAdminServerIntegrationTest {
 
     @Test
     public void postShouldDoMigrationAndUpdateCurrentVersion() throws Exception {
-        given()
+        String taskId = with()
             .port(webAdminGuiceProbe.getWebAdminPort())
             .body(String.valueOf(CassandraSchemaVersionManager.MAX_VERSION))
-        .when()
-            .post(UPGRADE_VERSION)
-        .then()
-            .statusCode(HttpStatus.NO_CONTENT_204);
+        .post(UPGRADE_VERSION)
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .port(webAdminGuiceProbe.getWebAdminPort())
+            .get("/task/" + taskId + "/await");
 
         given()
             .port(webAdminGuiceProbe.getWebAdminPort())
@@ -246,12 +255,15 @@ public class WebAdminServerIntegrationTest {
 
     @Test
     public void postShouldDoMigrationAndUpdateToTheLatestVersion() throws Exception {
-        given()
+        String taskId = with()
             .port(webAdminGuiceProbe.getWebAdminPort())
-        .when()
-            .post(UPGRADE_TO_LATEST_VERSION)
-        .then()
-            .statusCode(HttpStatus.OK_200);
+        .post(UPGRADE_TO_LATEST_VERSION)
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .port(webAdminGuiceProbe.getWebAdminPort())
+            .get("/task/" + taskId + "/await");
 
         given()
             .port(webAdminGuiceProbe.getWebAdminPort())
@@ -264,6 +276,40 @@ public class WebAdminServerIntegrationTest {
     }
 
     @Test
+    public void concurrentMigrationIsNotAllowed() throws Exception {
+        ConcurrentLinkedQueue<String> taskIds = new ConcurrentLinkedQueue<>();
+        int threadCount = 2;
+        int operationCount = 1;
+        new ConcurrentTestRunner(threadCount, operationCount, (a, b) -> {
+            String migrationId = with()
+                .port(webAdminGuiceProbe.getWebAdminPort())
+                .post(UPGRADE_TO_LATEST_VERSION)
+                .jsonPath()
+                .get("taskId");
+            taskIds.add(migrationId);
+        }).run()
+            .awaitTermination(1, TimeUnit.MINUTES);
+
+        String id1 = taskIds.poll();
+        String id2 = taskIds.poll();
+        String status1 = with()
+            .port(webAdminGuiceProbe.getWebAdminPort())
+            .get("/tasks/" + id1 + "/await")
+            .jsonPath()
+            .get("status");
+        String status2 = with()
+            .port(webAdminGuiceProbe.getWebAdminPort())
+            .get("/tasks/" + id2 + "/await")
+            .jsonPath()
+            .get("status");
+
+        assertThat(ImmutableList.of(status1, status2))
+            .containsOnly(
+                TaskManager.Status.COMPLETED.getValue(),
+                TaskManager.Status.FAILED.getValue());
+    }
+
+    @Test
     public void addressGroupsEndpointShouldHandleRequests() throws Exception {
         dataProbe.addAddressMapping("group", "domain.com", "user1@domain.com");
         dataProbe.addAddressMapping("group", "domain.com", "user2@domain.com");

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1a7139b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
index dce04cf..bff6d1f 100644
--- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
+++ b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
@@ -22,10 +22,13 @@ package org.apache.james.webadmin.routes;
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.migration.CassandraMigrationService;
-import org.apache.james.backends.cassandra.migration.MigrationException;
-import org.apache.james.webadmin.Constants;
+import org.apache.james.backends.cassandra.migration.Migration;
+import org.apache.james.task.TaskId;
+import org.apache.james.task.TaskManager;
 import org.apache.james.webadmin.Routes;
 import org.apache.james.webadmin.dto.CassandraVersionRequest;
+import org.apache.james.webadmin.dto.CassandraVersionResponse;
+import org.apache.james.webadmin.dto.TaskIdDto;
 import org.apache.james.webadmin.utils.ErrorResponder;
 import org.apache.james.webadmin.utils.ErrorResponder.ErrorType;
 import org.apache.james.webadmin.utils.JsonTransformer;
@@ -45,6 +48,7 @@ public class CassandraMigrationRoutes implements Routes {
     private static final String VERSION_UPGRADE_TO_LATEST_BASE = VERSION_UPGRADE_BASE + "/latest";
 
     private final CassandraMigrationService cassandraMigrationService;
+    private final TaskManager taskManager;
     private final JsonTransformer jsonTransformer;
 
     public static String INVALID_VERSION_UPGRADE_REQUEST = "Invalid request for version upgrade";
@@ -52,27 +56,30 @@ public class CassandraMigrationRoutes implements Routes {
     public static String PARTIAL_MIGRATION_PROCESS = "An error lead to partial migration process";
 
     @Inject
-    public CassandraMigrationRoutes(CassandraMigrationService cassandraMigrationService, JsonTransformer jsonTransformer) {
+    public CassandraMigrationRoutes(CassandraMigrationService cassandraMigrationService,
+                                    TaskManager taskManager, JsonTransformer jsonTransformer) {
         this.cassandraMigrationService = cassandraMigrationService;
+        this.taskManager = taskManager;
         this.jsonTransformer = jsonTransformer;
     }
 
     @Override
     public void define(Service service) {
         service.get(VERSION_BASE,
-            (request, response) -> cassandraMigrationService.getCurrentVersion(),
+            (request, response) -> new CassandraVersionResponse(cassandraMigrationService.getCurrentVersion()),
             jsonTransformer);
 
         service.get(VERSION_BASE_LATEST,
-            (request, response) -> cassandraMigrationService.getLatestVersion(),
+            (request, response) -> new CassandraVersionResponse(cassandraMigrationService.getLatestVersion()),
             jsonTransformer);
 
         service.post(VERSION_UPGRADE_BASE, (request, response) -> {
             LOGGER.debug("Cassandra upgrade launched");
             try {
                 CassandraVersionRequest cassandraVersionRequest = CassandraVersionRequest.parse(request.body());
-                cassandraMigrationService.upgradeToVersion(cassandraVersionRequest.getValue());
-                response.status(HttpStatus.NO_CONTENT_204);
+                Migration migration = cassandraMigrationService.upgradeToVersion(cassandraVersionRequest.getValue());
+                TaskId taskId = taskManager.submit(migration);
+                return TaskIdDto.respond(response, taskId);
             } catch (NullPointerException | IllegalArgumentException e) {
                 LOGGER.info(INVALID_VERSION_UPGRADE_REQUEST);
                 throw ErrorResponder.builder()
@@ -89,21 +96,14 @@ public class CassandraMigrationRoutes implements Routes {
                     .message(MIGRATION_REQUEST_CAN_NOT_BE_DONE)
                     .cause(e)
                     .haltError();
-            } catch (MigrationException e) {
-                LOGGER.error(PARTIAL_MIGRATION_PROCESS, e);
-                throw ErrorResponder.builder()
-                    .statusCode(HttpStatus.INTERNAL_SERVER_ERROR_500)
-                    .type(ErrorType.SERVER_ERROR)
-                    .message(PARTIAL_MIGRATION_PROCESS)
-                    .cause(e)
-                    .haltError();
             }
-            return Constants.EMPTY_BODY;
-        });
+        }, jsonTransformer);
 
         service.post(VERSION_UPGRADE_TO_LATEST_BASE, (request, response) -> {
             try {
-                cassandraMigrationService.upgradeToLastVersion();
+                Migration migration = cassandraMigrationService.upgradeToLastVersion();
+                TaskId taskId = taskManager.submit(migration);
+                return TaskIdDto.respond(response, taskId);
             } catch (IllegalStateException e) {
                 LOGGER.info(MIGRATION_REQUEST_CAN_NOT_BE_DONE, e);
                 throw ErrorResponder.builder()
@@ -112,17 +112,7 @@ public class CassandraMigrationRoutes implements Routes {
                     .message(MIGRATION_REQUEST_CAN_NOT_BE_DONE)
                     .cause(e)
                     .haltError();
-            } catch (MigrationException e) {
-                LOGGER.error(PARTIAL_MIGRATION_PROCESS, e);
-                throw ErrorResponder.builder()
-                    .statusCode(HttpStatus.INTERNAL_SERVER_ERROR_500)
-                    .type(ErrorType.SERVER_ERROR)
-                    .message(PARTIAL_MIGRATION_PROCESS)
-                    .cause(e)
-                    .haltError();
             }
-
-            return Constants.EMPTY_BODY;
-        });
+        }, jsonTransformer);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1a7139b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
index f3fc05a..0cc9d9c 100644
--- a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
@@ -21,10 +21,13 @@ package org.apache.james.webadmin.routes;
 
 import static com.jayway.restassured.RestAssured.given;
 import static com.jayway.restassured.RestAssured.when;
+import static com.jayway.restassured.RestAssured.with;
 import static com.jayway.restassured.config.EncoderConfig.encoderConfig;
 import static com.jayway.restassured.config.RestAssuredConfig.newConfig;
 import static org.apache.james.webadmin.WebAdminServer.NO_CONFIGURATION;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -38,9 +41,11 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.james.backends.cassandra.migration.CassandraMigrationService;
+import org.apache.james.backends.cassandra.migration.Migration;
+import org.apache.james.backends.cassandra.migration.MigrationTask;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
-import org.apache.james.mailbox.cassandra.mail.migration.Migration;
 import org.apache.james.metrics.logger.DefaultMetricFactory;
+import org.apache.james.task.MemoryTaskManager;
 import org.apache.james.webadmin.WebAdminServer;
 import org.apache.james.webadmin.WebAdminUtils;
 import org.apache.james.webadmin.utils.JsonTransformer;
@@ -56,16 +61,16 @@ import com.jayway.restassured.builder.RequestSpecBuilder;
 import com.jayway.restassured.http.ContentType;
 
 public class CassandraMigrationRoutesTest {
-
     private static final Integer LATEST_VERSION = 3;
     private static final Integer CURRENT_VERSION = 2;
     private static final Integer OLDER_VERSION = 1;
     private WebAdminServer webAdminServer;
     private CassandraSchemaVersionDAO schemaVersionDAO;
+    private MemoryTaskManager taskManager;
 
     private void createServer() throws Exception {
         Migration successfulMigration = mock(Migration.class);
-        when(successfulMigration.run()).thenReturn(Migration.MigrationResult.COMPLETED);
+        when(successfulMigration.run()).thenReturn(Migration.Result.COMPLETED);
 
         Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
             .put(OLDER_VERSION, successfulMigration)
@@ -74,9 +79,13 @@ public class CassandraMigrationRoutesTest {
             .build();
         schemaVersionDAO = mock(CassandraSchemaVersionDAO.class);
 
+        taskManager = new MemoryTaskManager();
+        JsonTransformer jsonTransformer = new JsonTransformer();
         webAdminServer = WebAdminUtils.createWebAdminServer(
             new DefaultMetricFactory(),
-            new CassandraMigrationRoutes(new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION), new JsonTransformer()));
+            new CassandraMigrationRoutes(new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION),
+                taskManager, jsonTransformer),
+            new TasksRoutes(taskManager, jsonTransformer));
 
         webAdminServer.configure(NO_CONFIGURATION);
         webAdminServer.await();
@@ -98,6 +107,7 @@ public class CassandraMigrationRoutesTest {
     @After
     public void tearDown() {
         webAdminServer.destroy();
+        taskManager.stop();
     }
 
     @Test
@@ -171,14 +181,20 @@ public class CassandraMigrationRoutesTest {
     public void postShouldDoMigrationToNewVersion() throws Exception {
         when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
 
-        given()
+        String taskId = with()
             .body(String.valueOf(CURRENT_VERSION))
-        .with()
-            .post("/upgrade")
+        .post("/upgrade")
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
         .then()
-            .statusCode(HttpStatus.NO_CONTENT_204);
+            .body("status", is("completed"));
 
-        verify(schemaVersionDAO, times(1)).getCurrentSchemaVersion();
+        verify(schemaVersionDAO, times(2)).getCurrentSchemaVersion();
         verify(schemaVersionDAO, times(1)).updateVersion(eq(CURRENT_VERSION));
         verifyNoMoreInteractions(schemaVersionDAO);
     }
@@ -213,18 +229,56 @@ public class CassandraMigrationRoutesTest {
     public void postShouldDoMigrationToLatestVersion() throws Exception {
         when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
 
-        when()
+        String taskId = with()
             .post("/upgrade/latest")
-        .then()
-            .statusCode(HttpStatus.OK_200);
+            .jsonPath()
+            .get("taskId");
 
-        verify(schemaVersionDAO, times(1)).getCurrentSchemaVersion();
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        verify(schemaVersionDAO, times(3)).getCurrentSchemaVersion();
         verify(schemaVersionDAO, times(1)).updateVersion(eq(CURRENT_VERSION));
         verify(schemaVersionDAO, times(1)).updateVersion(eq(LATEST_VERSION));
         verifyNoMoreInteractions(schemaVersionDAO);
     }
 
     @Test
+    public void postShouldReturnTaskIdAndLocation() throws Exception {
+        when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
+
+        when()
+            .post("/upgrade/latest")
+        .then()
+            .header("Location", is(notNullValue()))
+            .body("taskId", is(notNullValue()));
+    }
+
+    @Test
+    public void createdTaskShouldHaveDetails() throws Exception {
+        when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
+
+        String taskId = with()
+            .post("/upgrade/latest")
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("taskId", is(notNullValue()))
+            .body("type", is(MigrationTask.CASSANDRA_MIGRATION))
+            .body("additionalInformation.toVersion", is(LATEST_VERSION))
+            .body("startedDate", is(notNullValue()))
+            .body("submitDate", is(notNullValue()))
+            .body("completedDate", is(notNullValue()));
+    }
+
+    @Test
     public void postShouldNotDoMigrationToLatestVersionWhenItIsUpToDate() throws Exception {
         when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION)));
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1a7139b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/dto/TaskIdDto.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/dto/TaskIdDto.java b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/dto/TaskIdDto.java
index 231d051..64c4321 100644
--- a/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/dto/TaskIdDto.java
+++ b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/dto/TaskIdDto.java
@@ -19,11 +19,24 @@
 
 package org.apache.james.webadmin.dto;
 
+import static org.eclipse.jetty.http.HttpHeader.LOCATION;
+
 import java.util.UUID;
 
 import org.apache.james.task.TaskId;
+import org.apache.james.webadmin.routes.TasksRoutes;
+import org.eclipse.jetty.http.HttpStatus;
+
+import spark.Response;
 
 public class TaskIdDto {
+
+    public static TaskIdDto respond(Response response, TaskId taskId) {
+        response.status(HttpStatus.CREATED_201);
+        response.header(LOCATION.asString(), TasksRoutes.BASE + "/" + taskId.toString());
+        return TaskIdDto.from(taskId);
+    }
+
     public static TaskIdDto from(TaskId id) {
         return new TaskIdDto(id.getValue());
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org