You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/02/25 02:25:38 UTC
[james-project] 01/10: JAMES-3059 Allow Cassandra test suite to
inject fault at the session level
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 82438a76f99a620d08acfc712b2950e76f1c27a5
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Feb 21 12:59:10 2020 +0700
JAMES-3059 Allow Cassandra test suite to inject fault at the session level
---
.../james/backends/cassandra/CassandraCluster.java | 9 +-
.../james/backends/cassandra/TestingSession.java | 245 +++++++++++++++++++++
.../backends/cassandra/TestingSessionTest.java | 161 ++++++++++++++
3 files changed, 411 insertions(+), 4 deletions(-)
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java
index f4be201..4b4854b 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java
@@ -30,14 +30,13 @@ import org.apache.james.backends.cassandra.init.configuration.ClusterConfigurati
import org.apache.james.util.Host;
import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
public final class CassandraCluster implements AutoCloseable {
public static final String KEYSPACE = "testing";
private static Optional<Exception> startStackTrace = Optional.empty();
private final CassandraModule module;
- private Session session;
+ private TestingSession session;
private CassandraTypesProvider typesProvider;
private Cluster cluster;
@@ -65,14 +64,15 @@ public final class CassandraCluster implements AutoCloseable {
.build();
cluster = ClusterFactory.create(clusterConfiguration);
KeyspaceFactory.createKeyspace(clusterConfiguration, cluster);
- session = new SessionWithInitializedTablesFactory(clusterConfiguration, cluster, module).get();
+ session = new TestingSession(
+ new SessionWithInitializedTablesFactory(clusterConfiguration, cluster, module).get());
typesProvider = new CassandraTypesProvider(module, session);
} catch (Exception exception) {
throw new RuntimeException(exception);
}
}
- public Session getConf() {
+ public TestingSession getConf() {
return session;
}
@@ -82,6 +82,7 @@ public final class CassandraCluster implements AutoCloseable {
@Override
public void close() {
+ session.resetExecutionHook();
if (!cluster.isClosed()) {
clearTables();
closeCluster();
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java
new file mode 100644
index 0000000..5e0ad9d
--- /dev/null
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java
@@ -0,0 +1,245 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.CloseFuture;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.RegularStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class TestingSession implements Session {
+ enum Behavior {
+ THROW((session, statement) -> {
+ RuntimeException injected_failure = new RuntimeException("Injected failure");
+ injected_failure.printStackTrace();
+ throw injected_failure;
+ }),
+ EXECUTE_NORMALLY(Session::executeAsync);
+
+ private final BiFunction<Session, Statement, ResultSetFuture> behaviour;
+
+ Behavior(BiFunction<Session, Statement, ResultSetFuture> behaviour) {
+ this.behaviour = behaviour;
+ }
+
+ ResultSetFuture execute(Session session, Statement statement) {
+ return behaviour.apply(session, statement);
+ }
+ }
+
+ @FunctionalInterface
+ interface StatementPredicate extends Predicate<Statement> {
+
+ }
+
+ static class BoundStatementStartingWith implements StatementPredicate {
+ private final String queryStringPrefix;
+
+ BoundStatementStartingWith(String queryStringPrefix) {
+ this.queryStringPrefix = queryStringPrefix;
+ }
+
+ @Override
+ public boolean test(Statement statement) {
+ if (statement instanceof BoundStatement) {
+ BoundStatement boundStatement = (BoundStatement) statement;
+ return boundStatement.preparedStatement()
+ .getQueryString()
+ .startsWith(queryStringPrefix);
+ }
+ return false;
+ }
+ }
+
+ @FunctionalInterface
+ public interface RequiresCondition {
+ RequiresApplyCount condition(StatementPredicate statementPredicate);
+
+ default RequiresApplyCount always() {
+ return condition(ALL_STATEMENTS);
+ }
+
+ default RequiresApplyCount whenBoundStatementStartsWith(String queryStringPrefix) {
+ return condition(new BoundStatementStartingWith(queryStringPrefix));
+ }
+ }
+
+ @FunctionalInterface
+ public interface RequiresApplyCount {
+ FinalStage times(int applyCount);
+ }
+
+ @FunctionalInterface
+ public interface FinalStage {
+ void setExecutionHook();
+ }
+
+ private static class ExecutionHook {
+ final StatementPredicate statementPredicate;
+ final Behavior behavior;
+ final AtomicInteger remaining;
+
+ private ExecutionHook(StatementPredicate statementPredicate, Behavior behavior, int applyCount) {
+ this.statementPredicate = statementPredicate;
+ this.behavior = behavior;
+ this.remaining = new AtomicInteger(applyCount);
+ }
+
+ ResultSetFuture execute(Session session, Statement statement) {
+ if (statementPredicate.test(statement)) {
+ int hookPosition = remaining.getAndDecrement();
+ if (hookPosition > 0) {
+ return behavior.execute(session, statement);
+ }
+ }
+ return Behavior.EXECUTE_NORMALLY.execute(session, statement);
+ }
+ }
+
+ private static StatementPredicate ALL_STATEMENTS = statement -> true;
+ private static ExecutionHook NO_EXECUTION_HOOK = new ExecutionHook(ALL_STATEMENTS, Behavior.EXECUTE_NORMALLY, 0);
+
+ private final Session delegate;
+ private volatile ExecutionHook executionHook;
+
+ TestingSession(Session delegate) {
+ this.delegate = delegate;
+ this.executionHook = NO_EXECUTION_HOOK;
+ }
+
+ public RequiresCondition fail() {
+ return condition -> applyCount -> () -> executionHook = new ExecutionHook(condition, Behavior.THROW, applyCount);
+ }
+
+ public void resetExecutionHook() {
+ executionHook = NO_EXECUTION_HOOK;
+ }
+
+ @Override
+ public String getLoggedKeyspace() {
+ return delegate.getLoggedKeyspace();
+ }
+
+ @Override
+ public Session init() {
+ return delegate.init();
+ }
+
+ @Override
+ public ListenableFuture<Session> initAsync() {
+ return delegate.initAsync();
+ }
+
+ @Override
+ public ResultSet execute(String query) {
+ return delegate.execute(query);
+ }
+
+ @Override
+ public ResultSet execute(String query, Object... values) {
+ return delegate.execute(query, values);
+ }
+
+ @Override
+ public ResultSet execute(String query, Map<String, Object> values) {
+ return delegate.execute(query, values);
+ }
+
+ @Override
+ public ResultSet execute(Statement statement) {
+ return delegate.execute(statement);
+ }
+
+ @Override
+ public ResultSetFuture executeAsync(String query) {
+ return delegate.executeAsync(query);
+ }
+
+ @Override
+ public ResultSetFuture executeAsync(String query, Object... values) {
+ return delegate.executeAsync(query, values);
+ }
+
+ @Override
+ public ResultSetFuture executeAsync(String query, Map<String, Object> values) {
+ return delegate.executeAsync(query, values);
+ }
+
+ @Override
+ public ResultSetFuture executeAsync(Statement statement) {
+ return executionHook.execute(delegate, statement);
+ }
+
+ @Override
+ public PreparedStatement prepare(String query) {
+ return delegate.prepare(query);
+ }
+
+ @Override
+ public PreparedStatement prepare(RegularStatement statement) {
+ return delegate.prepare(statement);
+ }
+
+ @Override
+ public ListenableFuture<PreparedStatement> prepareAsync(String query) {
+ return delegate.prepareAsync(query);
+ }
+
+ @Override
+ public ListenableFuture<PreparedStatement> prepareAsync(RegularStatement statement) {
+ return delegate.prepareAsync(statement);
+ }
+
+ @Override
+ public CloseFuture closeAsync() {
+ return delegate.closeAsync();
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return delegate.isClosed();
+ }
+
+ @Override
+ public Cluster getCluster() {
+ return delegate.getCluster();
+ }
+
+ @Override
+ public State getState() {
+ return delegate.getState();
+ }
+}
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
new file mode 100644
index 0000000..1983e99
--- /dev/null
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
@@ -0,0 +1,161 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.backends.cassandra.versions.SchemaVersion;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class TestingSessionTest {
+ @RegisterExtension
+ static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraSchemaVersionModule.MODULE);
+
+ private CassandraSchemaVersionDAO dao;
+
+ @BeforeEach
+ void setUp(CassandraCluster cassandra) {
+ dao = new CassandraSchemaVersionDAO(cassandra.getConf());
+ }
+
+ @Test
+ void daoOperationShouldNotBeInstrumentedByDefault() {
+ assertThatCode(() -> dao.getCurrentSchemaVersion().block())
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ void daoOperationShouldNotBeInstrumentedWhenNotMatching(CassandraCluster cassandra) {
+ cassandra.getConf()
+ .fail()
+ .whenBoundStatementStartsWith("non matching")
+ .times(1)
+ .setExecutionHook();
+
+ assertThatCode(() -> dao.getCurrentSchemaVersion().block())
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ void daoOperationShouldNotBeInstrumentedWhenTimesIsZero(CassandraCluster cassandra) {
+ cassandra.getConf()
+ .fail()
+ .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;")
+ .times(0)
+ .setExecutionHook();
+
+ assertThatCode(() -> dao.getCurrentSchemaVersion().block())
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ void daoOperationShouldNotBeInstrumentedWhenTimesIsNegative(CassandraCluster cassandra) {
+ cassandra.getConf()
+ .fail()
+ .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;")
+ .times(-1)
+ .setExecutionHook();
+
+ assertThatCode(() -> dao.getCurrentSchemaVersion().block())
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ void daoOperationShouldFailWhenInstrumented(CassandraCluster cassandra) {
+ cassandra.getConf()
+ .fail()
+ .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;")
+ .times(1)
+ .setExecutionHook();
+
+ assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
+ .isInstanceOf(RuntimeException.class);
+ }
+
+ @Test
+ void daoShouldNotBeInstrumentedWhenTimesIsExceeded(CassandraCluster cassandra) {
+ cassandra.getConf()
+ .fail()
+ .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;")
+ .times(1)
+ .setExecutionHook();
+
+ try {
+ dao.getCurrentSchemaVersion().block();
+ } catch (Exception e) {
+ // discard expected exception
+ }
+
+ assertThatCode(() -> dao.getCurrentSchemaVersion().block())
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ void timesShouldSpecifyExactlyTheFailureCount(CassandraCluster cassandra) {
+ cassandra.getConf()
+ .fail()
+ .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;")
+ .times(2)
+ .setExecutionHook();
+
+ SoftAssertions.assertSoftly(softly -> {
+ assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
+ .isInstanceOf(RuntimeException.class);
+ assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
+ .isInstanceOf(RuntimeException.class);
+ assertThatCode(() -> dao.getCurrentSchemaVersion().block())
+ .doesNotThrowAnyException();
+ });
+ }
+
+ @Test
+ void resetExecutionHookShouldClearInstrumentation(CassandraCluster cassandra) {
+ cassandra.getConf()
+ .fail()
+ .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;")
+ .times(1)
+ .setExecutionHook();
+
+ cassandra.getConf().resetExecutionHook();
+
+ assertThatCode(() -> dao.getCurrentSchemaVersion().block())
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ void timesShouldBeTakenIntoAccountOnlyForMatchingStatements(CassandraCluster cassandra) {
+ cassandra.getConf()
+ .fail()
+ .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;")
+ .times(1)
+ .setExecutionHook();
+
+ dao.updateVersion(new SchemaVersion(36)).block();
+
+ assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
+ .isInstanceOf(RuntimeException.class);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org