You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/01/04 08:13:19 UTC

[04/21] james-project git commit: JAMES-2272 Cassandra migration service should be located in Cassandra back-end

JAMES-2272 Cassandra migration service should be located in Cassandra back-end

That will makes running Cassandra migration more generic and would be a good move toward CLI implementation


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b87d49c7
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b87d49c7
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b87d49c7

Branch: refs/heads/master
Commit: b87d49c79df4961d9975469babb94f51958dac03
Parents: 3f26590
Author: benwa <bt...@linagora.com>
Authored: Wed Dec 27 11:36:45 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Thu Jan 4 15:00:43 2018 +0700

----------------------------------------------------------------------
 .../migration/CassandraMigrationService.java    |  96 +++++++
 .../cassandra/migration/MigrationException.java |  26 ++
 .../CassandraMigrationServiceTest.java          | 260 ++++++++++++++++++
 .../modules/server/CassandraRoutesModule.java   |   2 +-
 .../routes/CassandraMigrationRoutes.java        |   4 +-
 .../service/CassandraMigrationService.java      |  98 -------
 .../webadmin/service/MigrationException.java    |  26 --
 .../routes/CassandraMigrationRoutesTest.java    |   2 +-
 .../service/CassandraMigrationServiceTest.java  | 261 -------------------
 9 files changed, 386 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java
new file mode 100644
index 0000000..0eb80aa
--- /dev/null
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java
@@ -0,0 +1,96 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra.migration;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class CassandraMigrationService {
+    public static final String LATEST_VERSION = "latestVersion";
+    private final CassandraSchemaVersionDAO schemaVersionDAO;
+    private final int latestVersion;
+    private final Map<Integer, Migration> allMigrationClazz;
+    private final Logger LOG = LoggerFactory.getLogger(CassandraMigrationService.class);
+
+    @Inject
+    public CassandraMigrationService(CassandraSchemaVersionDAO schemaVersionDAO, Map<Integer, Migration> allMigrationClazz, @Named(LATEST_VERSION) int latestVersion) {
+        Preconditions.checkArgument(latestVersion >= 0, "The latest version must be positive");
+        this.schemaVersionDAO = schemaVersionDAO;
+        this.latestVersion = latestVersion;
+        this.allMigrationClazz = allMigrationClazz;
+    }
+
+    public Optional<Integer> getCurrentVersion() {
+        return schemaVersionDAO.getCurrentSchemaVersion().join();
+    }
+
+    public Optional<Integer> getLatestVersion() {
+        return Optional.of(latestVersion);
+    }
+
+    public synchronized void upgradeToVersion(int newVersion) {
+        int currentVersion = schemaVersionDAO.getCurrentSchemaVersion().join().orElse(CassandraSchemaVersionManager.DEFAULT_VERSION);
+        if (currentVersion >= newVersion) {
+            throw new IllegalStateException("Current version is already up to date");
+        }
+
+        IntStream.range(currentVersion, newVersion)
+            .boxed()
+            .forEach(this::doMigration);
+    }
+
+    public void upgradeToLastVersion() {
+        upgradeToVersion(latestVersion);
+    }
+
+    private void doMigration(Integer version) {
+        if (allMigrationClazz.containsKey(version)) {
+            LOG.info("Migrating to version {} ", version + 1);
+            Migration.Result migrationResult = allMigrationClazz.get(version).run();
+            if (migrationResult == Migration.Result.COMPLETED) {
+                schemaVersionDAO.updateVersion(version + 1);
+                LOG.info("Migrating to version {} done", version + 1);
+            } else {
+                String message = String.format("Migrating to version %d partially done. " +
+                    "Please check logs for cause of failure and re-run this migration.",
+                    version + 1);
+                LOG.warn(message);
+                throw new MigrationException(message);
+            }
+        } else {
+            String message = String.format("Can not migrate to %d. No migration class registered.", version + 1);
+            LOG.error(message);
+            throw new NotImplementedException(message);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationException.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationException.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationException.java
new file mode 100644
index 0000000..41aa423
--- /dev/null
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationException.java
@@ -0,0 +1,26 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra.migration;
+
+public class MigrationException extends RuntimeException {
+    public MigrationException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
new file mode 100644
index 0000000..fa2a29f
--- /dev/null
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
@@ -0,0 +1,260 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra.migration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import com.datastax.driver.core.Session;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+
+public class CassandraMigrationServiceTest {
+    private static final int LATEST_VERSION = 3;
+    private static final int CURRENT_VERSION = 2;
+    private static final int OLDER_VERSION = 1;
+    private CassandraMigrationService testee;
+    private CassandraSchemaVersionDAO schemaVersionDAO;
+    private ExecutorService executorService;
+
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    private Migration successfulMigration;
+
+    @Before
+    public void setUp() throws Exception {
+        schemaVersionDAO = mock(CassandraSchemaVersionDAO.class);
+        successfulMigration = mock(Migration.class);
+        when(successfulMigration.run()).thenReturn(Migration.Result.COMPLETED);
+        Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
+            .put(OLDER_VERSION, successfulMigration)
+            .put(CURRENT_VERSION, successfulMigration)
+            .put(LATEST_VERSION, successfulMigration)
+            .build();
+        testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
+        executorService = Executors.newFixedThreadPool(2);
+    }
+
+    @After
+    public void tearDown() {
+        executorService.shutdownNow();
+    }
+
+    @Test
+    public void getCurrentVersionShouldReturnCurrentVersion() throws Exception {
+        when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION)));
+
+        assertThat(testee.getCurrentVersion().get()).isEqualTo(CURRENT_VERSION);
+    }
+
+    @Test
+    public void getLatestVersionShouldReturnTheLatestVersion() throws Exception {
+        assertThat(testee.getLatestVersion().get()).isEqualTo(LATEST_VERSION);
+    }
+
+    @Test
+    public void upgradeToVersionShouldThrowWhenCurrentVersionIsUpToDate() throws Exception {
+        expectedException.expect(IllegalStateException.class);
+
+        when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION)));
+
+        testee.upgradeToVersion(OLDER_VERSION);
+    }
+
+    @Test
+    public void upgradeToVersionShouldUpdateToVersion() throws Exception {
+        when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
+
+        testee.upgradeToVersion(CURRENT_VERSION);
+
+        verify(schemaVersionDAO, times(1)).updateVersion(eq(CURRENT_VERSION));
+    }
+
+    @Test
+    public void upgradeToLastVersionShouldThrowWhenVersionIsUpToDate() throws Exception {
+        expectedException.expect(IllegalStateException.class);
+
+        when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION)));
+
+        testee.upgradeToLastVersion();
+    }
+
+    @Test
+    public void upgradeToLastVersionShouldUpdateToLatestVersion() throws Exception {
+        when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
+
+        testee.upgradeToLastVersion();
+
+        verify(schemaVersionDAO, times(1)).updateVersion(eq(LATEST_VERSION));
+    }
+
+    @Test
+    public void upgradeToVersionShouldThrowOnMissingVersion() throws Exception {
+        Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
+            .put(OLDER_VERSION, successfulMigration)
+            .put(LATEST_VERSION, successfulMigration)
+            .build();
+        testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
+        when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
+
+        expectedException.expect(NotImplementedException.class);
+
+        testee.upgradeToVersion(LATEST_VERSION);
+    }
+
+    @Test
+    public void upgradeToVersionShouldUpdateIntermediarySuccessfulMigrationsInCaseOfError() throws Exception {
+        try {
+            Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
+                .put(OLDER_VERSION, successfulMigration)
+                .put(LATEST_VERSION, successfulMigration)
+                .build();
+            testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
+            when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
+
+            expectedException.expect(RuntimeException.class);
+
+            testee.upgradeToVersion(LATEST_VERSION);
+        } finally {
+            verify(schemaVersionDAO).updateVersion(CURRENT_VERSION);
+        }
+    }
+
+    @Test
+    public void concurrentMigrationsShouldFail() throws Exception {
+        // Given a stateful migration service
+        Migration wait1SecondMigration = mock(Migration.class);
+        doAnswer(invocation -> {
+            Thread.sleep(1000);
+            return Migration.Result.COMPLETED;
+        }).when(wait1SecondMigration).run();
+        Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
+            .put(OLDER_VERSION, wait1SecondMigration)
+            .put(CURRENT_VERSION, wait1SecondMigration)
+            .put(LATEST_VERSION, wait1SecondMigration)
+            .build();
+        testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION);
+
+        // When I perform a concurrent migration
+        AtomicInteger encounteredExceptionCount = new AtomicInteger(0);
+        executorService.submit(() -> testee.upgradeToVersion(LATEST_VERSION));
+        executorService.submit(() -> {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                throw Throwables.propagate(e);
+            }
+
+            try {
+                testee.upgradeToVersion(LATEST_VERSION);
+            } catch (IllegalStateException e) {
+                encounteredExceptionCount.incrementAndGet();
+            }
+        });
+        executorService.awaitTermination(10, TimeUnit.SECONDS);
+
+        // Then the second migration fails
+        assertThat(encounteredExceptionCount.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void partialMigrationShouldThrow() throws Exception {
+        Migration migration1 = mock(Migration.class);
+        when(migration1.run()).thenReturn(Migration.Result.PARTIAL);
+        Migration migration2 = successfulMigration;
+
+        Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
+            .put(OLDER_VERSION, migration1)
+            .put(CURRENT_VERSION, migration2)
+            .build();
+        testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION);
+
+        expectedException.expect(MigrationException.class);
+
+        testee.upgradeToVersion(LATEST_VERSION);
+    }
+
+    @Test
+    public void partialMigrationShouldAbortMigrations() throws Exception {
+        Migration migration1 = mock(Migration.class);
+        when(migration1.run()).thenReturn(Migration.Result.PARTIAL);
+        Migration migration2 = mock(Migration.class);
+        when(migration2.run()).thenReturn(Migration.Result.COMPLETED);
+
+        Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
+            .put(OLDER_VERSION, migration1)
+            .put(CURRENT_VERSION, migration2)
+            .build();
+        testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION);
+
+        expectedException.expect(MigrationException.class);
+
+        try {
+            testee.upgradeToVersion(LATEST_VERSION);
+        } finally {
+            verify(migration1, times(1)).run();
+            verifyNoMoreInteractions(migration1);
+            verifyZeroInteractions(migration2);
+        }
+    }
+
+    public static class InMemorySchemaDAO extends CassandraSchemaVersionDAO {
+        private int currentVersion;
+
+        public InMemorySchemaDAO(int currentVersion) {
+            super(mock(Session.class), null);
+            this.currentVersion = currentVersion;
+        }
+
+        @Override
+        public CompletableFuture<Optional<Integer>> getCurrentSchemaVersion() {
+            return CompletableFuture.completedFuture(Optional.of(currentVersion));
+        }
+
+        @Override
+        public CompletableFuture<Void> updateVersion(int newVersion) {
+            currentVersion = newVersion;
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
index 6fe2792..16b821b 100644
--- a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
+++ b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
@@ -19,13 +19,13 @@
 
 package org.apache.james.modules.server;
 
+import org.apache.james.backends.cassandra.migration.CassandraMigrationService;
 import org.apache.james.backends.cassandra.migration.Migration;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.mailbox.cassandra.mail.migration.AttachmentMessageIdCreation;
 import org.apache.james.mailbox.cassandra.mail.migration.AttachmentV2Migration;
 import org.apache.james.webadmin.Routes;
 import org.apache.james.webadmin.routes.CassandraMigrationRoutes;
-import org.apache.james.webadmin.service.CassandraMigrationService;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Scopes;

http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
index 3a42657..dce04cf 100644
--- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
+++ b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
@@ -21,11 +21,11 @@ package org.apache.james.webadmin.routes;
 
 import javax.inject.Inject;
 
+import org.apache.james.backends.cassandra.migration.CassandraMigrationService;
+import org.apache.james.backends.cassandra.migration.MigrationException;
 import org.apache.james.webadmin.Constants;
 import org.apache.james.webadmin.Routes;
 import org.apache.james.webadmin.dto.CassandraVersionRequest;
-import org.apache.james.webadmin.service.CassandraMigrationService;
-import org.apache.james.webadmin.service.MigrationException;
 import org.apache.james.webadmin.utils.ErrorResponder;
 import org.apache.james.webadmin.utils.ErrorResponder.ErrorType;
 import org.apache.james.webadmin.utils.JsonTransformer;

http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/CassandraMigrationService.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/CassandraMigrationService.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/CassandraMigrationService.java
deleted file mode 100644
index 004cd60..0000000
--- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/CassandraMigrationService.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.webadmin.service;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.IntStream;
-
-import javax.inject.Inject;
-import javax.inject.Named;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
-import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
-import org.apache.james.mailbox.cassandra.mail.migration.Migration;
-import org.apache.james.webadmin.dto.CassandraVersionResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-public class CassandraMigrationService {
-    public static final String LATEST_VERSION = "latestVersion";
-    private final CassandraSchemaVersionDAO schemaVersionDAO;
-    private final int latestVersion;
-    private final Map<Integer, Migration> allMigrationClazz;
-    private final Logger LOG = LoggerFactory.getLogger(CassandraMigrationService.class);
-
-    @Inject
-    public CassandraMigrationService(CassandraSchemaVersionDAO schemaVersionDAO, Map<Integer, Migration> allMigrationClazz, @Named(LATEST_VERSION) int latestVersion) {
-        Preconditions.checkArgument(latestVersion >= 0, "The latest version must be positive");
-        this.schemaVersionDAO = schemaVersionDAO;
-        this.latestVersion = latestVersion;
-        this.allMigrationClazz = allMigrationClazz;
-    }
-
-    public CassandraVersionResponse getCurrentVersion() {
-        return new CassandraVersionResponse(schemaVersionDAO.getCurrentSchemaVersion().join());
-    }
-
-    public CassandraVersionResponse getLatestVersion() {
-        return new CassandraVersionResponse(Optional.of(latestVersion));
-    }
-
-    public synchronized void upgradeToVersion(int newVersion) {
-        int currentVersion = schemaVersionDAO.getCurrentSchemaVersion().join().orElse(CassandraSchemaVersionManager.DEFAULT_VERSION);
-        if (currentVersion >= newVersion) {
-            throw new IllegalStateException("Current version is already up to date");
-        }
-
-        IntStream.range(currentVersion, newVersion)
-            .boxed()
-            .forEach(this::doMigration);
-    }
-
-    public void upgradeToLastVersion() {
-        upgradeToVersion(latestVersion);
-    }
-
-    private void doMigration(Integer version) {
-        if (allMigrationClazz.containsKey(version)) {
-            LOG.info("Migrating to version {} ", version + 1);
-            Migration.MigrationResult migrationResult = allMigrationClazz.get(version).run();
-            if (migrationResult == Migration.MigrationResult.COMPLETED) {
-                schemaVersionDAO.updateVersion(version + 1);
-                LOG.info("Migrating to version {} done", version + 1);
-            } else {
-                String message = String.format("Migrating to version %d partially done. " +
-                    "Please check logs for cause of failure and re-run this migration.",
-                    version + 1);
-                LOG.warn(message);
-                throw new MigrationException(message);
-            }
-        } else {
-            String message = String.format("Can not migrate to %d. No migration class registered.", version + 1);
-            LOG.error(message);
-            throw new NotImplementedException(message);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/MigrationException.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/MigrationException.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/MigrationException.java
deleted file mode 100644
index 7b9d74f..0000000
--- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/MigrationException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.webadmin.service;
-
-public class MigrationException extends RuntimeException {
-    public MigrationException(String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
index 1115e87..f3fc05a 100644
--- a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
@@ -37,12 +37,12 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.james.backends.cassandra.migration.CassandraMigrationService;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
 import org.apache.james.mailbox.cassandra.mail.migration.Migration;
 import org.apache.james.metrics.logger.DefaultMetricFactory;
 import org.apache.james.webadmin.WebAdminServer;
 import org.apache.james.webadmin.WebAdminUtils;
-import org.apache.james.webadmin.service.CassandraMigrationService;
 import org.apache.james.webadmin.utils.JsonTransformer;
 import org.eclipse.jetty.http.HttpStatus;
 import org.junit.After;

http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/service/CassandraMigrationServiceTest.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/service/CassandraMigrationServiceTest.java b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/service/CassandraMigrationServiceTest.java
deleted file mode 100644
index 0e7496f..0000000
--- a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/service/CassandraMigrationServiceTest.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.webadmin.service;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.james.backends.cassandra.migration.Migration;
-import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import com.datastax.driver.core.Session;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-
-public class CassandraMigrationServiceTest {
-    private static final int LATEST_VERSION = 3;
-    private static final int CURRENT_VERSION = 2;
-    private static final int OLDER_VERSION = 1;
-    private CassandraMigrationService testee;
-    private CassandraSchemaVersionDAO schemaVersionDAO;
-    private ExecutorService executorService;
-
-    @Rule
-    public ExpectedException expectedException = ExpectedException.none();
-    private Migration successfulMigration;
-
-    @Before
-    public void setUp() throws Exception {
-        schemaVersionDAO = mock(CassandraSchemaVersionDAO.class);
-        successfulMigration = mock(Migration.class);
-        when(successfulMigration.run()).thenReturn(Migration.Result.COMPLETED);
-        Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
-            .put(OLDER_VERSION, successfulMigration)
-            .put(CURRENT_VERSION, successfulMigration)
-            .put(LATEST_VERSION, successfulMigration)
-            .build();
-        testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
-        executorService = Executors.newFixedThreadPool(2);
-    }
-
-    @After
-    public void tearDown() {
-        executorService.shutdownNow();
-    }
-
-    @Test
-    public void getCurrentVersionShouldReturnCurrentVersion() throws Exception {
-        when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION)));
-
-        assertThat(testee.getCurrentVersion().getversion().get()).isEqualTo(CURRENT_VERSION);
-    }
-
-    @Test
-    public void getLatestVersionShouldReturnTheLatestVersion() throws Exception {
-        assertThat(testee.getLatestVersion().getversion().get()).isEqualTo(LATEST_VERSION);
-    }
-
-    @Test
-    public void upgradeToVersionShouldThrowWhenCurrentVersionIsUpToDate() throws Exception {
-        expectedException.expect(IllegalStateException.class);
-
-        when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION)));
-
-        testee.upgradeToVersion(OLDER_VERSION);
-    }
-
-    @Test
-    public void upgradeToVersionShouldUpdateToVersion() throws Exception {
-        when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
-
-        testee.upgradeToVersion(CURRENT_VERSION);
-
-        verify(schemaVersionDAO, times(1)).updateVersion(eq(CURRENT_VERSION));
-    }
-
-    @Test
-    public void upgradeToLastVersionShouldThrowWhenVersionIsUpToDate() throws Exception {
-        expectedException.expect(IllegalStateException.class);
-
-        when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION)));
-
-        testee.upgradeToLastVersion();
-    }
-
-    @Test
-    public void upgradeToLastVersionShouldUpdateToLatestVersion() throws Exception {
-        when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
-
-        testee.upgradeToLastVersion();
-
-        verify(schemaVersionDAO, times(1)).updateVersion(eq(LATEST_VERSION));
-    }
-
-    @Test
-    public void upgradeToVersionShouldThrowOnMissingVersion() throws Exception {
-        Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
-            .put(OLDER_VERSION, successfulMigration)
-            .put(LATEST_VERSION, successfulMigration)
-            .build();
-        testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
-        when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
-
-        expectedException.expect(NotImplementedException.class);
-
-        testee.upgradeToVersion(LATEST_VERSION);
-    }
-
-    @Test
-    public void upgradeToVersionShouldUpdateIntermediarySuccessfulMigrationsInCaseOfError() throws Exception {
-        try {
-            Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
-                .put(OLDER_VERSION, successfulMigration)
-                .put(LATEST_VERSION, successfulMigration)
-                .build();
-            testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
-            when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
-
-            expectedException.expect(RuntimeException.class);
-
-            testee.upgradeToVersion(LATEST_VERSION);
-        } finally {
-            verify(schemaVersionDAO).updateVersion(CURRENT_VERSION);
-        }
-    }
-
-    @Test
-    public void concurrentMigrationsShouldFail() throws Exception {
-        // Given a stateful migration service
-        Migration wait1SecondMigration = mock(Migration.class);
-        doAnswer(invocation -> {
-            Thread.sleep(1000);
-            return Migration.Result.COMPLETED;
-        }).when(wait1SecondMigration).run();
-        Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
-            .put(OLDER_VERSION, wait1SecondMigration)
-            .put(CURRENT_VERSION, wait1SecondMigration)
-            .put(LATEST_VERSION, wait1SecondMigration)
-            .build();
-        testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION);
-
-        // When I perform a concurrent migration
-        AtomicInteger encounteredExceptionCount = new AtomicInteger(0);
-        executorService.submit(() -> testee.upgradeToVersion(LATEST_VERSION));
-        executorService.submit(() -> {
-            try {
-                Thread.sleep(500);
-            } catch (InterruptedException e) {
-                throw Throwables.propagate(e);
-            }
-
-            try {
-                testee.upgradeToVersion(LATEST_VERSION);
-            } catch (IllegalStateException e) {
-                encounteredExceptionCount.incrementAndGet();
-            }
-        });
-        executorService.awaitTermination(10, TimeUnit.SECONDS);
-
-        // Then the second migration fails
-        assertThat(encounteredExceptionCount.get()).isEqualTo(1);
-    }
-
-    @Test
-    public void partialMigrationShouldThrow() throws Exception {
-        Migration migration1 = mock(Migration.class);
-        when(migration1.run()).thenReturn(Migration.Result.PARTIAL);
-        Migration migration2 = successfulMigration;
-
-        Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
-            .put(OLDER_VERSION, migration1)
-            .put(CURRENT_VERSION, migration2)
-            .build();
-        testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION);
-
-        expectedException.expect(MigrationException.class);
-
-        testee.upgradeToVersion(LATEST_VERSION);
-    }
-
-    @Test
-    public void partialMigrationShouldAbortMigrations() throws Exception {
-        Migration migration1 = mock(Migration.class);
-        when(migration1.run()).thenReturn(Migration.Result.PARTIAL);
-        Migration migration2 = mock(Migration.class);
-        when(migration2.run()).thenReturn(Migration.Result.COMPLETED);
-
-        Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
-            .put(OLDER_VERSION, migration1)
-            .put(CURRENT_VERSION, migration2)
-            .build();
-        testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION);
-
-        expectedException.expect(MigrationException.class);
-
-        try {
-            testee.upgradeToVersion(LATEST_VERSION);
-        } finally {
-            verify(migration1, times(1)).run();
-            verifyNoMoreInteractions(migration1);
-            verifyZeroInteractions(migration2);
-        }
-    }
-
-    public static class InMemorySchemaDAO extends CassandraSchemaVersionDAO {
-        private int currentVersion;
-
-        public InMemorySchemaDAO(int currentVersion) {
-            super(mock(Session.class), null);
-            this.currentVersion = currentVersion;
-        }
-
-        @Override
-        public CompletableFuture<Optional<Integer>> getCurrentSchemaVersion() {
-            return CompletableFuture.completedFuture(Optional.of(currentVersion));
-        }
-
-        @Override
-        public CompletableFuture<Void> updateVersion(int newVersion) {
-            currentVersion = newVersion;
-            return CompletableFuture.completedFuture(null);
-        }
-    }
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org