You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/30 04:45:23 UTC

[james-project] branch master updated: [BUILD] Provision Cassandra tables in parallel (#1159)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 6be4be1894 [BUILD] Provision Cassandra tables in parallel (#1159)
6be4be1894 is described below

commit 6be4be18949e24b740f35d211e7691463be4872a
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Tue Aug 30 11:45:18 2022 +0700

    [BUILD] Provision Cassandra tables in parallel (#1159)
    
    Localy table creation went from 42s to 6s.
---
 .../cassandra/components/CassandraModule.java      |  6 +-
 .../cassandra/components/CassandraTable.java       | 13 +++--
 .../cassandra/components/CassandraType.java        | 12 ++--
 .../cassandra/init/CassandraTableManager.java      | 10 ++--
 .../cassandra/init/CassandraTypesCreator.java      | 12 ++--
 .../cassandra/components/CassandraTableTest.java   | 66 ++++++++++++++++++++--
 .../cassandra/components/CassandraTypeTest.java    | 66 ++++++++++++++++++++--
 7 files changed, 153 insertions(+), 32 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraModule.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraModule.java
index 45f5ecd1f0..7653e4de72 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraModule.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraModule.java
@@ -142,8 +142,7 @@ public interface CassandraModule {
             Preconditions.checkState(comment.isPresent(), "`comment` is compulsory");
 
             Function<CassandraTypesProvider, CreateTable> createStatement = toCreateStatement.apply(
-                createTable(tableName)
-                    .ifNotExists());
+                createTable(tableName));
 
             Function<CassandraTypesProvider, CreateTableWithOptions> finalStatement = options.map(optionTramsformation ->
                createStatement.andThen(table -> optionTramsformation.apply(table).withComment(comment.get())))
@@ -166,8 +165,7 @@ public interface CassandraModule {
         public Builder statement(Function<CreateTypeStart, CreateType> createStatement) {
             return originalBuilderReference.addType(
                 new CassandraType(typeName, createStatement.apply(
-                    createType(typeName)
-                        .ifNotExists())));
+                    createType(typeName))));
         }
     }
 
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraTable.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraTable.java
index 6b52e9d3f0..f8339eff61 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraTable.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraTable.java
@@ -30,6 +30,8 @@ import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
 import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
 import com.google.common.base.MoreObjects;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraTable {
     public enum InitializationStatus {
         ALREADY_DONE,
@@ -57,15 +59,14 @@ public class CassandraTable {
         return name;
     }
 
-    public InitializationStatus initialize(KeyspaceMetadata keyspaceMetadata, CqlSession session, CassandraTypesProvider typesProvider) {
+    public Mono<InitializationStatus> initialize(KeyspaceMetadata keyspaceMetadata, CqlSession session, CassandraTypesProvider typesProvider) {
         if (keyspaceMetadata.getTable(name).isPresent()) {
-            return InitializationStatus.ALREADY_DONE;
+            return Mono.just(InitializationStatus.ALREADY_DONE);
         }
 
-        session.execute(createStatement.apply(typesProvider).build()
-            .setExecutionProfile(JamesExecutionProfiles.getTableCreationProfile(session)));
-
-        return InitializationStatus.FULL;
+        return Mono.from(session.executeReactive(createStatement.apply(typesProvider).build()
+            .setExecutionProfile(JamesExecutionProfiles.getTableCreationProfile(session))))
+            .thenReturn(InitializationStatus.FULL);
     }
 
     @Override
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraType.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraType.java
index 5a8dfe6554..ea27c3f865 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraType.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraType.java
@@ -29,6 +29,8 @@ import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
 import com.datastax.oss.driver.api.querybuilder.schema.CreateType;
 import com.google.common.base.MoreObjects;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraType {
     public enum InitializationStatus {
         ALREADY_DONE,
@@ -56,14 +58,14 @@ public class CassandraType {
         return name;
     }
 
-    public InitializationStatus initialize(KeyspaceMetadata keyspaceMetadata, CqlSession session) {
+    public Mono<InitializationStatus> initialize(KeyspaceMetadata keyspaceMetadata, CqlSession session) {
         if (keyspaceMetadata.getUserDefinedTypes().get(CqlIdentifier.fromCql(name)) != null) {
-            return InitializationStatus.ALREADY_DONE;
+            return Mono.just(InitializationStatus.ALREADY_DONE);
         }
 
-        session.execute(createStatement.build()
-            .setExecutionProfile(JamesExecutionProfiles.getTableCreationProfile(session)));
-        return InitializationStatus.FULL;
+        return Mono.from(session.executeReactive(createStatement.build()
+            .setExecutionProfile(JamesExecutionProfiles.getTableCreationProfile(session))))
+            .thenReturn(InitializationStatus.FULL);
     }
 
     @Override
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java
index 6eae69997c..498e750d24 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java
@@ -51,11 +51,11 @@ public class CassandraTableManager {
     public InitializationStatus initializeTables(CassandraTypesProvider typesProvider) {
         KeyspaceMetadata keyspaceMetadata = session.getMetadata().getKeyspaces().get(session.getKeyspace().get());
 
-        return module.moduleTables()
-                .stream()
-                .map(table -> table.initialize(keyspaceMetadata, session, typesProvider))
-                .reduce((left, right) -> left.reduce(right))
-                .orElse(InitializationStatus.ALREADY_DONE);
+        return Flux.fromIterable(module.moduleTables())
+            .flatMap(table -> table.initialize(keyspaceMetadata, session, typesProvider))
+            .reduce(InitializationStatus::reduce)
+            .switchIfEmpty(Mono.just(InitializationStatus.ALREADY_DONE))
+            .block();
     }
 
     public void clearAllTables() {
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTypesCreator.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTypesCreator.java
index 6a81379e1a..3581f9b45b 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTypesCreator.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTypesCreator.java
@@ -27,6 +27,9 @@ import com.datastax.oss.driver.api.core.CqlSession;
 import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class CassandraTypesCreator {
     private final ImmutableList<CassandraType> types;
     private final CqlSession session;
@@ -39,9 +42,10 @@ public class CassandraTypesCreator {
     public InitializationStatus initializeTypes() {
         KeyspaceMetadata keyspaceMetadata = session.getMetadata().getKeyspaces().get(session.getKeyspace().get());
 
-        return types.stream()
-                .map(type -> type.initialize(keyspaceMetadata, session))
-                .reduce(InitializationStatus::reduce)
-                .orElse(InitializationStatus.ALREADY_DONE);
+        return Flux.fromIterable(types)
+            .flatMap(type -> type.initialize(keyspaceMetadata, session))
+            .reduce(InitializationStatus::reduce)
+            .switchIfEmpty(Mono.just(InitializationStatus.ALREADY_DONE))
+            .block();
     }
 }
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTableTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTableTest.java
index 900296b7e1..4f3be609c3 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTableTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTableTest.java
@@ -22,6 +22,7 @@ import static org.apache.james.backends.cassandra.components.CassandraTable.Init
 import static org.apache.james.backends.cassandra.components.CassandraTable.InitializationStatus.FULL;
 import static org.apache.james.backends.cassandra.components.CassandraTable.InitializationStatus.PARTIAL;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -32,16 +33,24 @@ import java.util.stream.Stream;
 
 import org.apache.james.backends.cassandra.components.CassandraTable.InitializationStatus;
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
+import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
+import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow;
 import com.datastax.oss.driver.api.core.CqlSession;
 import com.datastax.oss.driver.api.core.config.DriverConfig;
 import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
 import com.datastax.oss.driver.api.core.context.DriverContext;
+import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
+import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
 import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
 import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
 import com.datastax.oss.driver.api.core.type.DataTypes;
@@ -49,6 +58,7 @@ import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
 import com.datastax.oss.driver.api.querybuilder.schema.CreateTable;
 
 import nl.jqno.equalsverifier.EqualsVerifier;
+import reactor.core.publisher.Mono;
 
 class CassandraTableTest {
     private static final String NAME = "tableName";
@@ -73,15 +83,39 @@ class CassandraTableTest {
         DriverContext context = mock(DriverContext.class);
         DriverConfig config = mock(DriverConfig.class);
         when(session.getContext()).thenReturn(context);
+        when(session.executeReactive(any(SimpleStatement.class))).thenReturn(new ReactiveResultSet() {
+            @NotNull
+            @Override
+            public Publisher<? extends ColumnDefinitions> getColumnDefinitions() {
+                return Mono.empty();
+            }
+
+            @NotNull
+            @Override
+            public Publisher<? extends ExecutionInfo> getExecutionInfos() {
+                return Mono.empty();
+            }
+
+            @NotNull
+            @Override
+            public Publisher<Boolean> wasApplied() {
+                return Mono.just(true);
+            }
+
+            @Override
+            public void subscribe(Subscriber<? super ReactiveRow> s) {
+                s.onComplete();
+            }
+        });
         when(context.getConfig()).thenReturn(config);
         when(config.getProfiles()).thenReturn(ImmutableMap.of());
         when(config.getDefaultProfile()).thenReturn(mock(DriverExecutionProfile.class));
 
-        assertThat(TABLE.initialize(keyspace, session, new CassandraTypesProvider(session)))
+        assertThat(TABLE.initialize(keyspace, session, new CassandraTypesProvider(session)).block())
                 .isEqualByComparingTo(FULL);
 
         verify(keyspace).getTable(NAME);
-        verify(session).execute(STATEMENT.build());
+        verify(session).executeReactive(STATEMENT.build());
     }
 
     @Test
@@ -89,12 +123,36 @@ class CassandraTableTest {
         KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class);
         when(keyspace.getTable(NAME)).thenReturn(Optional.of(mock(TableMetadata.class)));
         CqlSession session = mock(CqlSession.class);
+        when(session.executeReactive(any(SimpleStatement.class))).thenReturn(new ReactiveResultSet() {
+            @NotNull
+            @Override
+            public Publisher<? extends ColumnDefinitions> getColumnDefinitions() {
+                return Mono.empty();
+            }
+
+            @NotNull
+            @Override
+            public Publisher<? extends ExecutionInfo> getExecutionInfos() {
+                return Mono.empty();
+            }
+
+            @NotNull
+            @Override
+            public Publisher<Boolean> wasApplied() {
+                return Mono.just(true);
+            }
+
+            @Override
+            public void subscribe(Subscriber<? super ReactiveRow> s) {
+                s.onComplete();
+            }
+        });
 
-        assertThat(TABLE.initialize(keyspace, session, new CassandraTypesProvider(session)))
+        assertThat(TABLE.initialize(keyspace, session, new CassandraTypesProvider(session)).block())
                 .isEqualByComparingTo(ALREADY_DONE);
 
         verify(keyspace).getTable(NAME);
-        verify(session, never()).execute(STATEMENT.build());
+        verify(session, never()).executeReactive(STATEMENT.build());
     }
 
     @ParameterizedTest
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTypeTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTypeTest.java
index 1bde503f7a..5564241ac0 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTypeTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTypeTest.java
@@ -22,6 +22,7 @@ import static org.apache.james.backends.cassandra.components.CassandraType.Initi
 import static org.apache.james.backends.cassandra.components.CassandraType.InitializationStatus.FULL;
 import static org.apache.james.backends.cassandra.components.CassandraType.InitializationStatus.PARTIAL;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -30,17 +31,25 @@ import static org.mockito.Mockito.when;
 import java.util.stream.Stream;
 
 import org.apache.james.backends.cassandra.components.CassandraType.InitializationStatus;
+import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
+import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
+import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow;
 import com.datastax.oss.driver.api.core.CqlIdentifier;
 import com.datastax.oss.driver.api.core.CqlSession;
 import com.datastax.oss.driver.api.core.config.DriverConfig;
 import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
 import com.datastax.oss.driver.api.core.context.DriverContext;
+import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
+import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
 import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
 import com.datastax.oss.driver.api.core.type.DataTypes;
 import com.datastax.oss.driver.api.core.type.UserDefinedType;
@@ -48,6 +57,7 @@ import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
 import com.datastax.oss.driver.api.querybuilder.schema.CreateType;
 
 import nl.jqno.equalsverifier.EqualsVerifier;
+import reactor.core.publisher.Mono;
 
 class CassandraTypeTest {
     private static final String NAME = "typeName";
@@ -73,14 +83,38 @@ class CassandraTypeTest {
         DriverContext context = mock(DriverContext.class);
         DriverConfig config = mock(DriverConfig.class);
         when(session.getContext()).thenReturn(context);
+        when(session.executeReactive(any(SimpleStatement.class))).thenReturn(new ReactiveResultSet() {
+            @NotNull
+            @Override
+            public Publisher<? extends ColumnDefinitions> getColumnDefinitions() {
+                return Mono.empty();
+            }
+
+            @NotNull
+            @Override
+            public Publisher<? extends ExecutionInfo> getExecutionInfos() {
+                return Mono.empty();
+            }
+
+            @NotNull
+            @Override
+            public Publisher<Boolean> wasApplied() {
+                return Mono.just(true);
+            }
+
+            @Override
+            public void subscribe(Subscriber<? super ReactiveRow> s) {
+                s.onComplete();
+            }
+        });
         when(context.getConfig()).thenReturn(config);
         when(config.getProfiles()).thenReturn(ImmutableMap.of());
         when(config.getDefaultProfile()).thenReturn(mock(DriverExecutionProfile.class));
 
-        assertThat(TYPE.initialize(keyspace, session))
+        assertThat(TYPE.initialize(keyspace, session).block())
                 .isEqualByComparingTo(FULL);
 
-        verify(session).execute(STATEMENT.build());
+        verify(session).executeReactive(STATEMENT.build());
     }
 
     @Test
@@ -88,11 +122,35 @@ class CassandraTypeTest {
         KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class);
         when(keyspace.getUserDefinedTypes()).thenReturn(ImmutableMap.of(CqlIdentifier.fromCql(NAME), mock(UserDefinedType.class)));
         CqlSession session = mock(CqlSession.class);
+        when(session.executeReactive(any(SimpleStatement.class))).thenReturn(new ReactiveResultSet() {
+            @NotNull
+            @Override
+            public Publisher<? extends ColumnDefinitions> getColumnDefinitions() {
+                return Mono.empty();
+            }
+
+            @NotNull
+            @Override
+            public Publisher<? extends ExecutionInfo> getExecutionInfos() {
+                return Mono.empty();
+            }
+
+            @NotNull
+            @Override
+            public Publisher<Boolean> wasApplied() {
+                return Mono.just(true);
+            }
+
+            @Override
+            public void subscribe(Subscriber<? super ReactiveRow> s) {
+                s.onComplete();
+            }
+        });
 
-        assertThat(TYPE.initialize(keyspace, session))
+        assertThat(TYPE.initialize(keyspace, session).block())
                 .isEqualByComparingTo(ALREADY_DONE);
 
-        verify(session, never()).execute(STATEMENT.build());
+        verify(session, never()).executeReactive(STATEMENT.build());
     }
 
     @ParameterizedTest


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org