You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/01/04 08:13:19 UTC
[04/21] james-project git commit: JAMES-2272 Cassandra migration
service should be located in Cassandra back-end
JAMES-2272 Cassandra migration service should be located in Cassandra back-end
That will makes running Cassandra migration more generic and would be a good move toward CLI implementation
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b87d49c7
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b87d49c7
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b87d49c7
Branch: refs/heads/master
Commit: b87d49c79df4961d9975469babb94f51958dac03
Parents: 3f26590
Author: benwa <bt...@linagora.com>
Authored: Wed Dec 27 11:36:45 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Thu Jan 4 15:00:43 2018 +0700
----------------------------------------------------------------------
.../migration/CassandraMigrationService.java | 96 +++++++
.../cassandra/migration/MigrationException.java | 26 ++
.../CassandraMigrationServiceTest.java | 260 ++++++++++++++++++
.../modules/server/CassandraRoutesModule.java | 2 +-
.../routes/CassandraMigrationRoutes.java | 4 +-
.../service/CassandraMigrationService.java | 98 -------
.../webadmin/service/MigrationException.java | 26 --
.../routes/CassandraMigrationRoutesTest.java | 2 +-
.../service/CassandraMigrationServiceTest.java | 261 -------------------
9 files changed, 386 insertions(+), 389 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java
new file mode 100644
index 0000000..0eb80aa
--- /dev/null
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/CassandraMigrationService.java
@@ -0,0 +1,96 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra.migration;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class CassandraMigrationService {
+ public static final String LATEST_VERSION = "latestVersion";
+ private final CassandraSchemaVersionDAO schemaVersionDAO;
+ private final int latestVersion;
+ private final Map<Integer, Migration> allMigrationClazz;
+ private final Logger LOG = LoggerFactory.getLogger(CassandraMigrationService.class);
+
+ @Inject
+ public CassandraMigrationService(CassandraSchemaVersionDAO schemaVersionDAO, Map<Integer, Migration> allMigrationClazz, @Named(LATEST_VERSION) int latestVersion) {
+ Preconditions.checkArgument(latestVersion >= 0, "The latest version must be positive");
+ this.schemaVersionDAO = schemaVersionDAO;
+ this.latestVersion = latestVersion;
+ this.allMigrationClazz = allMigrationClazz;
+ }
+
+ public Optional<Integer> getCurrentVersion() {
+ return schemaVersionDAO.getCurrentSchemaVersion().join();
+ }
+
+ public Optional<Integer> getLatestVersion() {
+ return Optional.of(latestVersion);
+ }
+
+ public synchronized void upgradeToVersion(int newVersion) {
+ int currentVersion = schemaVersionDAO.getCurrentSchemaVersion().join().orElse(CassandraSchemaVersionManager.DEFAULT_VERSION);
+ if (currentVersion >= newVersion) {
+ throw new IllegalStateException("Current version is already up to date");
+ }
+
+ IntStream.range(currentVersion, newVersion)
+ .boxed()
+ .forEach(this::doMigration);
+ }
+
+ public void upgradeToLastVersion() {
+ upgradeToVersion(latestVersion);
+ }
+
+ private void doMigration(Integer version) {
+ if (allMigrationClazz.containsKey(version)) {
+ LOG.info("Migrating to version {} ", version + 1);
+ Migration.Result migrationResult = allMigrationClazz.get(version).run();
+ if (migrationResult == Migration.Result.COMPLETED) {
+ schemaVersionDAO.updateVersion(version + 1);
+ LOG.info("Migrating to version {} done", version + 1);
+ } else {
+ String message = String.format("Migrating to version %d partially done. " +
+ "Please check logs for cause of failure and re-run this migration.",
+ version + 1);
+ LOG.warn(message);
+ throw new MigrationException(message);
+ }
+ } else {
+ String message = String.format("Can not migrate to %d. No migration class registered.", version + 1);
+ LOG.error(message);
+ throw new NotImplementedException(message);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationException.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationException.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationException.java
new file mode 100644
index 0000000..41aa423
--- /dev/null
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/migration/MigrationException.java
@@ -0,0 +1,26 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra.migration;
+
+public class MigrationException extends RuntimeException {
+ public MigrationException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
new file mode 100644
index 0000000..fa2a29f
--- /dev/null
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
@@ -0,0 +1,260 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra.migration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import com.datastax.driver.core.Session;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+
+public class CassandraMigrationServiceTest {
+ private static final int LATEST_VERSION = 3;
+ private static final int CURRENT_VERSION = 2;
+ private static final int OLDER_VERSION = 1;
+ private CassandraMigrationService testee;
+ private CassandraSchemaVersionDAO schemaVersionDAO;
+ private ExecutorService executorService;
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+ private Migration successfulMigration;
+
+ @Before
+ public void setUp() throws Exception {
+ schemaVersionDAO = mock(CassandraSchemaVersionDAO.class);
+ successfulMigration = mock(Migration.class);
+ when(successfulMigration.run()).thenReturn(Migration.Result.COMPLETED);
+ Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
+ .put(OLDER_VERSION, successfulMigration)
+ .put(CURRENT_VERSION, successfulMigration)
+ .put(LATEST_VERSION, successfulMigration)
+ .build();
+ testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
+ executorService = Executors.newFixedThreadPool(2);
+ }
+
+ @After
+ public void tearDown() {
+ executorService.shutdownNow();
+ }
+
+ @Test
+ public void getCurrentVersionShouldReturnCurrentVersion() throws Exception {
+ when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION)));
+
+ assertThat(testee.getCurrentVersion().get()).isEqualTo(CURRENT_VERSION);
+ }
+
+ @Test
+ public void getLatestVersionShouldReturnTheLatestVersion() throws Exception {
+ assertThat(testee.getLatestVersion().get()).isEqualTo(LATEST_VERSION);
+ }
+
+ @Test
+ public void upgradeToVersionShouldThrowWhenCurrentVersionIsUpToDate() throws Exception {
+ expectedException.expect(IllegalStateException.class);
+
+ when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION)));
+
+ testee.upgradeToVersion(OLDER_VERSION);
+ }
+
+ @Test
+ public void upgradeToVersionShouldUpdateToVersion() throws Exception {
+ when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
+
+ testee.upgradeToVersion(CURRENT_VERSION);
+
+ verify(schemaVersionDAO, times(1)).updateVersion(eq(CURRENT_VERSION));
+ }
+
+ @Test
+ public void upgradeToLastVersionShouldThrowWhenVersionIsUpToDate() throws Exception {
+ expectedException.expect(IllegalStateException.class);
+
+ when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION)));
+
+ testee.upgradeToLastVersion();
+ }
+
+ @Test
+ public void upgradeToLastVersionShouldUpdateToLatestVersion() throws Exception {
+ when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
+
+ testee.upgradeToLastVersion();
+
+ verify(schemaVersionDAO, times(1)).updateVersion(eq(LATEST_VERSION));
+ }
+
+ @Test
+ public void upgradeToVersionShouldThrowOnMissingVersion() throws Exception {
+ Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
+ .put(OLDER_VERSION, successfulMigration)
+ .put(LATEST_VERSION, successfulMigration)
+ .build();
+ testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
+ when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
+
+ expectedException.expect(NotImplementedException.class);
+
+ testee.upgradeToVersion(LATEST_VERSION);
+ }
+
+ @Test
+ public void upgradeToVersionShouldUpdateIntermediarySuccessfulMigrationsInCaseOfError() throws Exception {
+ try {
+ Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
+ .put(OLDER_VERSION, successfulMigration)
+ .put(LATEST_VERSION, successfulMigration)
+ .build();
+ testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
+ when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
+
+ expectedException.expect(RuntimeException.class);
+
+ testee.upgradeToVersion(LATEST_VERSION);
+ } finally {
+ verify(schemaVersionDAO).updateVersion(CURRENT_VERSION);
+ }
+ }
+
+ @Test
+ public void concurrentMigrationsShouldFail() throws Exception {
+ // Given a stateful migration service
+ Migration wait1SecondMigration = mock(Migration.class);
+ doAnswer(invocation -> {
+ Thread.sleep(1000);
+ return Migration.Result.COMPLETED;
+ }).when(wait1SecondMigration).run();
+ Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
+ .put(OLDER_VERSION, wait1SecondMigration)
+ .put(CURRENT_VERSION, wait1SecondMigration)
+ .put(LATEST_VERSION, wait1SecondMigration)
+ .build();
+ testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION);
+
+ // When I perform a concurrent migration
+ AtomicInteger encounteredExceptionCount = new AtomicInteger(0);
+ executorService.submit(() -> testee.upgradeToVersion(LATEST_VERSION));
+ executorService.submit(() -> {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+
+ try {
+ testee.upgradeToVersion(LATEST_VERSION);
+ } catch (IllegalStateException e) {
+ encounteredExceptionCount.incrementAndGet();
+ }
+ });
+ executorService.awaitTermination(10, TimeUnit.SECONDS);
+
+ // Then the second migration fails
+ assertThat(encounteredExceptionCount.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void partialMigrationShouldThrow() throws Exception {
+ Migration migration1 = mock(Migration.class);
+ when(migration1.run()).thenReturn(Migration.Result.PARTIAL);
+ Migration migration2 = successfulMigration;
+
+ Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
+ .put(OLDER_VERSION, migration1)
+ .put(CURRENT_VERSION, migration2)
+ .build();
+ testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION);
+
+ expectedException.expect(MigrationException.class);
+
+ testee.upgradeToVersion(LATEST_VERSION);
+ }
+
+ @Test
+ public void partialMigrationShouldAbortMigrations() throws Exception {
+ Migration migration1 = mock(Migration.class);
+ when(migration1.run()).thenReturn(Migration.Result.PARTIAL);
+ Migration migration2 = mock(Migration.class);
+ when(migration2.run()).thenReturn(Migration.Result.COMPLETED);
+
+ Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
+ .put(OLDER_VERSION, migration1)
+ .put(CURRENT_VERSION, migration2)
+ .build();
+ testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION);
+
+ expectedException.expect(MigrationException.class);
+
+ try {
+ testee.upgradeToVersion(LATEST_VERSION);
+ } finally {
+ verify(migration1, times(1)).run();
+ verifyNoMoreInteractions(migration1);
+ verifyZeroInteractions(migration2);
+ }
+ }
+
+ public static class InMemorySchemaDAO extends CassandraSchemaVersionDAO {
+ private int currentVersion;
+
+ public InMemorySchemaDAO(int currentVersion) {
+ super(mock(Session.class), null);
+ this.currentVersion = currentVersion;
+ }
+
+ @Override
+ public CompletableFuture<Optional<Integer>> getCurrentSchemaVersion() {
+ return CompletableFuture.completedFuture(Optional.of(currentVersion));
+ }
+
+ @Override
+ public CompletableFuture<Void> updateVersion(int newVersion) {
+ currentVersion = newVersion;
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
index 6fe2792..16b821b 100644
--- a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
+++ b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
@@ -19,13 +19,13 @@
package org.apache.james.modules.server;
+import org.apache.james.backends.cassandra.migration.CassandraMigrationService;
import org.apache.james.backends.cassandra.migration.Migration;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
import org.apache.james.mailbox.cassandra.mail.migration.AttachmentMessageIdCreation;
import org.apache.james.mailbox.cassandra.mail.migration.AttachmentV2Migration;
import org.apache.james.webadmin.Routes;
import org.apache.james.webadmin.routes.CassandraMigrationRoutes;
-import org.apache.james.webadmin.service.CassandraMigrationService;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
index 3a42657..dce04cf 100644
--- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
+++ b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/CassandraMigrationRoutes.java
@@ -21,11 +21,11 @@ package org.apache.james.webadmin.routes;
import javax.inject.Inject;
+import org.apache.james.backends.cassandra.migration.CassandraMigrationService;
+import org.apache.james.backends.cassandra.migration.MigrationException;
import org.apache.james.webadmin.Constants;
import org.apache.james.webadmin.Routes;
import org.apache.james.webadmin.dto.CassandraVersionRequest;
-import org.apache.james.webadmin.service.CassandraMigrationService;
-import org.apache.james.webadmin.service.MigrationException;
import org.apache.james.webadmin.utils.ErrorResponder;
import org.apache.james.webadmin.utils.ErrorResponder.ErrorType;
import org.apache.james.webadmin.utils.JsonTransformer;
http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/CassandraMigrationService.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/CassandraMigrationService.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/CassandraMigrationService.java
deleted file mode 100644
index 004cd60..0000000
--- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/CassandraMigrationService.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.webadmin.service;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.IntStream;
-
-import javax.inject.Inject;
-import javax.inject.Named;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
-import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
-import org.apache.james.mailbox.cassandra.mail.migration.Migration;
-import org.apache.james.webadmin.dto.CassandraVersionResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-public class CassandraMigrationService {
- public static final String LATEST_VERSION = "latestVersion";
- private final CassandraSchemaVersionDAO schemaVersionDAO;
- private final int latestVersion;
- private final Map<Integer, Migration> allMigrationClazz;
- private final Logger LOG = LoggerFactory.getLogger(CassandraMigrationService.class);
-
- @Inject
- public CassandraMigrationService(CassandraSchemaVersionDAO schemaVersionDAO, Map<Integer, Migration> allMigrationClazz, @Named(LATEST_VERSION) int latestVersion) {
- Preconditions.checkArgument(latestVersion >= 0, "The latest version must be positive");
- this.schemaVersionDAO = schemaVersionDAO;
- this.latestVersion = latestVersion;
- this.allMigrationClazz = allMigrationClazz;
- }
-
- public CassandraVersionResponse getCurrentVersion() {
- return new CassandraVersionResponse(schemaVersionDAO.getCurrentSchemaVersion().join());
- }
-
- public CassandraVersionResponse getLatestVersion() {
- return new CassandraVersionResponse(Optional.of(latestVersion));
- }
-
- public synchronized void upgradeToVersion(int newVersion) {
- int currentVersion = schemaVersionDAO.getCurrentSchemaVersion().join().orElse(CassandraSchemaVersionManager.DEFAULT_VERSION);
- if (currentVersion >= newVersion) {
- throw new IllegalStateException("Current version is already up to date");
- }
-
- IntStream.range(currentVersion, newVersion)
- .boxed()
- .forEach(this::doMigration);
- }
-
- public void upgradeToLastVersion() {
- upgradeToVersion(latestVersion);
- }
-
- private void doMigration(Integer version) {
- if (allMigrationClazz.containsKey(version)) {
- LOG.info("Migrating to version {} ", version + 1);
- Migration.MigrationResult migrationResult = allMigrationClazz.get(version).run();
- if (migrationResult == Migration.MigrationResult.COMPLETED) {
- schemaVersionDAO.updateVersion(version + 1);
- LOG.info("Migrating to version {} done", version + 1);
- } else {
- String message = String.format("Migrating to version %d partially done. " +
- "Please check logs for cause of failure and re-run this migration.",
- version + 1);
- LOG.warn(message);
- throw new MigrationException(message);
- }
- } else {
- String message = String.format("Can not migrate to %d. No migration class registered.", version + 1);
- LOG.error(message);
- throw new NotImplementedException(message);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/MigrationException.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/MigrationException.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/MigrationException.java
deleted file mode 100644
index 7b9d74f..0000000
--- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/MigrationException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.webadmin.service;
-
-public class MigrationException extends RuntimeException {
- public MigrationException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
index 1115e87..f3fc05a 100644
--- a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
@@ -37,12 +37,12 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import org.apache.james.backends.cassandra.migration.CassandraMigrationService;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
import org.apache.james.mailbox.cassandra.mail.migration.Migration;
import org.apache.james.metrics.logger.DefaultMetricFactory;
import org.apache.james.webadmin.WebAdminServer;
import org.apache.james.webadmin.WebAdminUtils;
-import org.apache.james.webadmin.service.CassandraMigrationService;
import org.apache.james.webadmin.utils.JsonTransformer;
import org.eclipse.jetty.http.HttpStatus;
import org.junit.After;
http://git-wip-us.apache.org/repos/asf/james-project/blob/b87d49c7/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/service/CassandraMigrationServiceTest.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/service/CassandraMigrationServiceTest.java b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/service/CassandraMigrationServiceTest.java
deleted file mode 100644
index 0e7496f..0000000
--- a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/service/CassandraMigrationServiceTest.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.webadmin.service;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.james.backends.cassandra.migration.Migration;
-import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import com.datastax.driver.core.Session;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-
-public class CassandraMigrationServiceTest {
- private static final int LATEST_VERSION = 3;
- private static final int CURRENT_VERSION = 2;
- private static final int OLDER_VERSION = 1;
- private CassandraMigrationService testee;
- private CassandraSchemaVersionDAO schemaVersionDAO;
- private ExecutorService executorService;
-
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
- private Migration successfulMigration;
-
- @Before
- public void setUp() throws Exception {
- schemaVersionDAO = mock(CassandraSchemaVersionDAO.class);
- successfulMigration = mock(Migration.class);
- when(successfulMigration.run()).thenReturn(Migration.Result.COMPLETED);
- Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
- .put(OLDER_VERSION, successfulMigration)
- .put(CURRENT_VERSION, successfulMigration)
- .put(LATEST_VERSION, successfulMigration)
- .build();
- testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
- executorService = Executors.newFixedThreadPool(2);
- }
-
- @After
- public void tearDown() {
- executorService.shutdownNow();
- }
-
- @Test
- public void getCurrentVersionShouldReturnCurrentVersion() throws Exception {
- when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION)));
-
- assertThat(testee.getCurrentVersion().getversion().get()).isEqualTo(CURRENT_VERSION);
- }
-
- @Test
- public void getLatestVersionShouldReturnTheLatestVersion() throws Exception {
- assertThat(testee.getLatestVersion().getversion().get()).isEqualTo(LATEST_VERSION);
- }
-
- @Test
- public void upgradeToVersionShouldThrowWhenCurrentVersionIsUpToDate() throws Exception {
- expectedException.expect(IllegalStateException.class);
-
- when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION)));
-
- testee.upgradeToVersion(OLDER_VERSION);
- }
-
- @Test
- public void upgradeToVersionShouldUpdateToVersion() throws Exception {
- when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
-
- testee.upgradeToVersion(CURRENT_VERSION);
-
- verify(schemaVersionDAO, times(1)).updateVersion(eq(CURRENT_VERSION));
- }
-
- @Test
- public void upgradeToLastVersionShouldThrowWhenVersionIsUpToDate() throws Exception {
- expectedException.expect(IllegalStateException.class);
-
- when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION)));
-
- testee.upgradeToLastVersion();
- }
-
- @Test
- public void upgradeToLastVersionShouldUpdateToLatestVersion() throws Exception {
- when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
-
- testee.upgradeToLastVersion();
-
- verify(schemaVersionDAO, times(1)).updateVersion(eq(LATEST_VERSION));
- }
-
- @Test
- public void upgradeToVersionShouldThrowOnMissingVersion() throws Exception {
- Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
- .put(OLDER_VERSION, successfulMigration)
- .put(LATEST_VERSION, successfulMigration)
- .build();
- testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
- when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
-
- expectedException.expect(NotImplementedException.class);
-
- testee.upgradeToVersion(LATEST_VERSION);
- }
-
- @Test
- public void upgradeToVersionShouldUpdateIntermediarySuccessfulMigrationsInCaseOfError() throws Exception {
- try {
- Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
- .put(OLDER_VERSION, successfulMigration)
- .put(LATEST_VERSION, successfulMigration)
- .build();
- testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
- when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
-
- expectedException.expect(RuntimeException.class);
-
- testee.upgradeToVersion(LATEST_VERSION);
- } finally {
- verify(schemaVersionDAO).updateVersion(CURRENT_VERSION);
- }
- }
-
- @Test
- public void concurrentMigrationsShouldFail() throws Exception {
- // Given a stateful migration service
- Migration wait1SecondMigration = mock(Migration.class);
- doAnswer(invocation -> {
- Thread.sleep(1000);
- return Migration.Result.COMPLETED;
- }).when(wait1SecondMigration).run();
- Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
- .put(OLDER_VERSION, wait1SecondMigration)
- .put(CURRENT_VERSION, wait1SecondMigration)
- .put(LATEST_VERSION, wait1SecondMigration)
- .build();
- testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION);
-
- // When I perform a concurrent migration
- AtomicInteger encounteredExceptionCount = new AtomicInteger(0);
- executorService.submit(() -> testee.upgradeToVersion(LATEST_VERSION));
- executorService.submit(() -> {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- throw Throwables.propagate(e);
- }
-
- try {
- testee.upgradeToVersion(LATEST_VERSION);
- } catch (IllegalStateException e) {
- encounteredExceptionCount.incrementAndGet();
- }
- });
- executorService.awaitTermination(10, TimeUnit.SECONDS);
-
- // Then the second migration fails
- assertThat(encounteredExceptionCount.get()).isEqualTo(1);
- }
-
- @Test
- public void partialMigrationShouldThrow() throws Exception {
- Migration migration1 = mock(Migration.class);
- when(migration1.run()).thenReturn(Migration.Result.PARTIAL);
- Migration migration2 = successfulMigration;
-
- Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
- .put(OLDER_VERSION, migration1)
- .put(CURRENT_VERSION, migration2)
- .build();
- testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION);
-
- expectedException.expect(MigrationException.class);
-
- testee.upgradeToVersion(LATEST_VERSION);
- }
-
- @Test
- public void partialMigrationShouldAbortMigrations() throws Exception {
- Migration migration1 = mock(Migration.class);
- when(migration1.run()).thenReturn(Migration.Result.PARTIAL);
- Migration migration2 = mock(Migration.class);
- when(migration2.run()).thenReturn(Migration.Result.COMPLETED);
-
- Map<Integer, Migration> allMigrationClazz = ImmutableMap.<Integer, Migration>builder()
- .put(OLDER_VERSION, migration1)
- .put(CURRENT_VERSION, migration2)
- .build();
- testee = new CassandraMigrationService(new InMemorySchemaDAO(OLDER_VERSION), allMigrationClazz, LATEST_VERSION);
-
- expectedException.expect(MigrationException.class);
-
- try {
- testee.upgradeToVersion(LATEST_VERSION);
- } finally {
- verify(migration1, times(1)).run();
- verifyNoMoreInteractions(migration1);
- verifyZeroInteractions(migration2);
- }
- }
-
- public static class InMemorySchemaDAO extends CassandraSchemaVersionDAO {
- private int currentVersion;
-
- public InMemorySchemaDAO(int currentVersion) {
- super(mock(Session.class), null);
- this.currentVersion = currentVersion;
- }
-
- @Override
- public CompletableFuture<Optional<Integer>> getCurrentSchemaVersion() {
- return CompletableFuture.completedFuture(Optional.of(currentVersion));
- }
-
- @Override
- public CompletableFuture<Void> updateVersion(int newVersion) {
- currentVersion = newVersion;
- return CompletableFuture.completedFuture(null);
- }
- }
-}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org