You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/30 04:45:23 UTC
[james-project] branch master updated: [BUILD] Provision Cassandra tables in parallel (#1159)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 6be4be1894 [BUILD] Provision Cassandra tables in parallel (#1159)
6be4be1894 is described below
commit 6be4be18949e24b740f35d211e7691463be4872a
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Tue Aug 30 11:45:18 2022 +0700
[BUILD] Provision Cassandra tables in parallel (#1159)
Localy table creation went from 42s to 6s.
---
.../cassandra/components/CassandraModule.java | 6 +-
.../cassandra/components/CassandraTable.java | 13 +++--
.../cassandra/components/CassandraType.java | 12 ++--
.../cassandra/init/CassandraTableManager.java | 10 ++--
.../cassandra/init/CassandraTypesCreator.java | 12 ++--
.../cassandra/components/CassandraTableTest.java | 66 ++++++++++++++++++++--
.../cassandra/components/CassandraTypeTest.java | 66 ++++++++++++++++++++--
7 files changed, 153 insertions(+), 32 deletions(-)
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraModule.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraModule.java
index 45f5ecd1f0..7653e4de72 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraModule.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraModule.java
@@ -142,8 +142,7 @@ public interface CassandraModule {
Preconditions.checkState(comment.isPresent(), "`comment` is compulsory");
Function<CassandraTypesProvider, CreateTable> createStatement = toCreateStatement.apply(
- createTable(tableName)
- .ifNotExists());
+ createTable(tableName));
Function<CassandraTypesProvider, CreateTableWithOptions> finalStatement = options.map(optionTramsformation ->
createStatement.andThen(table -> optionTramsformation.apply(table).withComment(comment.get())))
@@ -166,8 +165,7 @@ public interface CassandraModule {
public Builder statement(Function<CreateTypeStart, CreateType> createStatement) {
return originalBuilderReference.addType(
new CassandraType(typeName, createStatement.apply(
- createType(typeName)
- .ifNotExists())));
+ createType(typeName))));
}
}
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraTable.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraTable.java
index 6b52e9d3f0..f8339eff61 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraTable.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraTable.java
@@ -30,6 +30,8 @@ import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
import com.google.common.base.MoreObjects;
+import reactor.core.publisher.Mono;
+
public class CassandraTable {
public enum InitializationStatus {
ALREADY_DONE,
@@ -57,15 +59,14 @@ public class CassandraTable {
return name;
}
- public InitializationStatus initialize(KeyspaceMetadata keyspaceMetadata, CqlSession session, CassandraTypesProvider typesProvider) {
+ public Mono<InitializationStatus> initialize(KeyspaceMetadata keyspaceMetadata, CqlSession session, CassandraTypesProvider typesProvider) {
if (keyspaceMetadata.getTable(name).isPresent()) {
- return InitializationStatus.ALREADY_DONE;
+ return Mono.just(InitializationStatus.ALREADY_DONE);
}
- session.execute(createStatement.apply(typesProvider).build()
- .setExecutionProfile(JamesExecutionProfiles.getTableCreationProfile(session)));
-
- return InitializationStatus.FULL;
+ return Mono.from(session.executeReactive(createStatement.apply(typesProvider).build()
+ .setExecutionProfile(JamesExecutionProfiles.getTableCreationProfile(session))))
+ .thenReturn(InitializationStatus.FULL);
}
@Override
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraType.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraType.java
index 5a8dfe6554..ea27c3f865 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraType.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraType.java
@@ -29,6 +29,8 @@ import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.querybuilder.schema.CreateType;
import com.google.common.base.MoreObjects;
+import reactor.core.publisher.Mono;
+
public class CassandraType {
public enum InitializationStatus {
ALREADY_DONE,
@@ -56,14 +58,14 @@ public class CassandraType {
return name;
}
- public InitializationStatus initialize(KeyspaceMetadata keyspaceMetadata, CqlSession session) {
+ public Mono<InitializationStatus> initialize(KeyspaceMetadata keyspaceMetadata, CqlSession session) {
if (keyspaceMetadata.getUserDefinedTypes().get(CqlIdentifier.fromCql(name)) != null) {
- return InitializationStatus.ALREADY_DONE;
+ return Mono.just(InitializationStatus.ALREADY_DONE);
}
- session.execute(createStatement.build()
- .setExecutionProfile(JamesExecutionProfiles.getTableCreationProfile(session)));
- return InitializationStatus.FULL;
+ return Mono.from(session.executeReactive(createStatement.build()
+ .setExecutionProfile(JamesExecutionProfiles.getTableCreationProfile(session))))
+ .thenReturn(InitializationStatus.FULL);
}
@Override
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java
index 6eae69997c..498e750d24 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java
@@ -51,11 +51,11 @@ public class CassandraTableManager {
public InitializationStatus initializeTables(CassandraTypesProvider typesProvider) {
KeyspaceMetadata keyspaceMetadata = session.getMetadata().getKeyspaces().get(session.getKeyspace().get());
- return module.moduleTables()
- .stream()
- .map(table -> table.initialize(keyspaceMetadata, session, typesProvider))
- .reduce((left, right) -> left.reduce(right))
- .orElse(InitializationStatus.ALREADY_DONE);
+ return Flux.fromIterable(module.moduleTables())
+ .flatMap(table -> table.initialize(keyspaceMetadata, session, typesProvider))
+ .reduce(InitializationStatus::reduce)
+ .switchIfEmpty(Mono.just(InitializationStatus.ALREADY_DONE))
+ .block();
}
public void clearAllTables() {
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTypesCreator.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTypesCreator.java
index 6a81379e1a..3581f9b45b 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTypesCreator.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTypesCreator.java
@@ -27,6 +27,9 @@ import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
public class CassandraTypesCreator {
private final ImmutableList<CassandraType> types;
private final CqlSession session;
@@ -39,9 +42,10 @@ public class CassandraTypesCreator {
public InitializationStatus initializeTypes() {
KeyspaceMetadata keyspaceMetadata = session.getMetadata().getKeyspaces().get(session.getKeyspace().get());
- return types.stream()
- .map(type -> type.initialize(keyspaceMetadata, session))
- .reduce(InitializationStatus::reduce)
- .orElse(InitializationStatus.ALREADY_DONE);
+ return Flux.fromIterable(types)
+ .flatMap(type -> type.initialize(keyspaceMetadata, session))
+ .reduce(InitializationStatus::reduce)
+ .switchIfEmpty(Mono.just(InitializationStatus.ALREADY_DONE))
+ .block();
}
}
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTableTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTableTest.java
index 900296b7e1..4f3be609c3 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTableTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTableTest.java
@@ -22,6 +22,7 @@ import static org.apache.james.backends.cassandra.components.CassandraTable.Init
import static org.apache.james.backends.cassandra.components.CassandraTable.InitializationStatus.FULL;
import static org.apache.james.backends.cassandra.components.CassandraTable.InitializationStatus.PARTIAL;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -32,16 +33,24 @@ import java.util.stream.Stream;
import org.apache.james.backends.cassandra.components.CassandraTable.InitializationStatus;
import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
+import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.context.DriverContext;
+import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
+import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.type.DataTypes;
@@ -49,6 +58,7 @@ import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.schema.CreateTable;
import nl.jqno.equalsverifier.EqualsVerifier;
+import reactor.core.publisher.Mono;
class CassandraTableTest {
private static final String NAME = "tableName";
@@ -73,15 +83,39 @@ class CassandraTableTest {
DriverContext context = mock(DriverContext.class);
DriverConfig config = mock(DriverConfig.class);
when(session.getContext()).thenReturn(context);
+ when(session.executeReactive(any(SimpleStatement.class))).thenReturn(new ReactiveResultSet() {
+ @NotNull
+ @Override
+ public Publisher<? extends ColumnDefinitions> getColumnDefinitions() {
+ return Mono.empty();
+ }
+
+ @NotNull
+ @Override
+ public Publisher<? extends ExecutionInfo> getExecutionInfos() {
+ return Mono.empty();
+ }
+
+ @NotNull
+ @Override
+ public Publisher<Boolean> wasApplied() {
+ return Mono.just(true);
+ }
+
+ @Override
+ public void subscribe(Subscriber<? super ReactiveRow> s) {
+ s.onComplete();
+ }
+ });
when(context.getConfig()).thenReturn(config);
when(config.getProfiles()).thenReturn(ImmutableMap.of());
when(config.getDefaultProfile()).thenReturn(mock(DriverExecutionProfile.class));
- assertThat(TABLE.initialize(keyspace, session, new CassandraTypesProvider(session)))
+ assertThat(TABLE.initialize(keyspace, session, new CassandraTypesProvider(session)).block())
.isEqualByComparingTo(FULL);
verify(keyspace).getTable(NAME);
- verify(session).execute(STATEMENT.build());
+ verify(session).executeReactive(STATEMENT.build());
}
@Test
@@ -89,12 +123,36 @@ class CassandraTableTest {
KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class);
when(keyspace.getTable(NAME)).thenReturn(Optional.of(mock(TableMetadata.class)));
CqlSession session = mock(CqlSession.class);
+ when(session.executeReactive(any(SimpleStatement.class))).thenReturn(new ReactiveResultSet() {
+ @NotNull
+ @Override
+ public Publisher<? extends ColumnDefinitions> getColumnDefinitions() {
+ return Mono.empty();
+ }
+
+ @NotNull
+ @Override
+ public Publisher<? extends ExecutionInfo> getExecutionInfos() {
+ return Mono.empty();
+ }
+
+ @NotNull
+ @Override
+ public Publisher<Boolean> wasApplied() {
+ return Mono.just(true);
+ }
+
+ @Override
+ public void subscribe(Subscriber<? super ReactiveRow> s) {
+ s.onComplete();
+ }
+ });
- assertThat(TABLE.initialize(keyspace, session, new CassandraTypesProvider(session)))
+ assertThat(TABLE.initialize(keyspace, session, new CassandraTypesProvider(session)).block())
.isEqualByComparingTo(ALREADY_DONE);
verify(keyspace).getTable(NAME);
- verify(session, never()).execute(STATEMENT.build());
+ verify(session, never()).executeReactive(STATEMENT.build());
}
@ParameterizedTest
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTypeTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTypeTest.java
index 1bde503f7a..5564241ac0 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTypeTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/components/CassandraTypeTest.java
@@ -22,6 +22,7 @@ import static org.apache.james.backends.cassandra.components.CassandraType.Initi
import static org.apache.james.backends.cassandra.components.CassandraType.InitializationStatus.FULL;
import static org.apache.james.backends.cassandra.components.CassandraType.InitializationStatus.PARTIAL;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -30,17 +31,25 @@ import static org.mockito.Mockito.when;
import java.util.stream.Stream;
import org.apache.james.backends.cassandra.components.CassandraType.InitializationStatus;
+import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
+import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.context.DriverContext;
+import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
+import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
@@ -48,6 +57,7 @@ import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.schema.CreateType;
import nl.jqno.equalsverifier.EqualsVerifier;
+import reactor.core.publisher.Mono;
class CassandraTypeTest {
private static final String NAME = "typeName";
@@ -73,14 +83,38 @@ class CassandraTypeTest {
DriverContext context = mock(DriverContext.class);
DriverConfig config = mock(DriverConfig.class);
when(session.getContext()).thenReturn(context);
+ when(session.executeReactive(any(SimpleStatement.class))).thenReturn(new ReactiveResultSet() {
+ @NotNull
+ @Override
+ public Publisher<? extends ColumnDefinitions> getColumnDefinitions() {
+ return Mono.empty();
+ }
+
+ @NotNull
+ @Override
+ public Publisher<? extends ExecutionInfo> getExecutionInfos() {
+ return Mono.empty();
+ }
+
+ @NotNull
+ @Override
+ public Publisher<Boolean> wasApplied() {
+ return Mono.just(true);
+ }
+
+ @Override
+ public void subscribe(Subscriber<? super ReactiveRow> s) {
+ s.onComplete();
+ }
+ });
when(context.getConfig()).thenReturn(config);
when(config.getProfiles()).thenReturn(ImmutableMap.of());
when(config.getDefaultProfile()).thenReturn(mock(DriverExecutionProfile.class));
- assertThat(TYPE.initialize(keyspace, session))
+ assertThat(TYPE.initialize(keyspace, session).block())
.isEqualByComparingTo(FULL);
- verify(session).execute(STATEMENT.build());
+ verify(session).executeReactive(STATEMENT.build());
}
@Test
@@ -88,11 +122,35 @@ class CassandraTypeTest {
KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class);
when(keyspace.getUserDefinedTypes()).thenReturn(ImmutableMap.of(CqlIdentifier.fromCql(NAME), mock(UserDefinedType.class)));
CqlSession session = mock(CqlSession.class);
+ when(session.executeReactive(any(SimpleStatement.class))).thenReturn(new ReactiveResultSet() {
+ @NotNull
+ @Override
+ public Publisher<? extends ColumnDefinitions> getColumnDefinitions() {
+ return Mono.empty();
+ }
+
+ @NotNull
+ @Override
+ public Publisher<? extends ExecutionInfo> getExecutionInfos() {
+ return Mono.empty();
+ }
+
+ @NotNull
+ @Override
+ public Publisher<Boolean> wasApplied() {
+ return Mono.just(true);
+ }
+
+ @Override
+ public void subscribe(Subscriber<? super ReactiveRow> s) {
+ s.onComplete();
+ }
+ });
- assertThat(TYPE.initialize(keyspace, session))
+ assertThat(TYPE.initialize(keyspace, session).block())
.isEqualByComparingTo(ALREADY_DONE);
- verify(session, never()).execute(STATEMENT.build());
+ verify(session, never()).executeReactive(STATEMENT.build());
}
@ParameterizedTest
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org