You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/02/25 02:25:38 UTC

[james-project] 01/10: JAMES-3059 Allow Cassandra test suite to inject fault at the session level

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 82438a76f99a620d08acfc712b2950e76f1c27a5
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Feb 21 12:59:10 2020 +0700

    JAMES-3059 Allow Cassandra test suite to inject fault at the session level
---
 .../james/backends/cassandra/CassandraCluster.java |   9 +-
 .../james/backends/cassandra/TestingSession.java   | 245 +++++++++++++++++++++
 .../backends/cassandra/TestingSessionTest.java     | 161 ++++++++++++++
 3 files changed, 411 insertions(+), 4 deletions(-)

diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java
index f4be201..4b4854b 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java
@@ -30,14 +30,13 @@ import org.apache.james.backends.cassandra.init.configuration.ClusterConfigurati
 import org.apache.james.util.Host;
 
 import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
 
 public final class CassandraCluster implements AutoCloseable {
     public static final String KEYSPACE = "testing";
 
     private static Optional<Exception> startStackTrace = Optional.empty();
     private final CassandraModule module;
-    private Session session;
+    private TestingSession session;
     private CassandraTypesProvider typesProvider;
     private Cluster cluster;
 
@@ -65,14 +64,15 @@ public final class CassandraCluster implements AutoCloseable {
                 .build();
             cluster = ClusterFactory.create(clusterConfiguration);
             KeyspaceFactory.createKeyspace(clusterConfiguration, cluster);
-            session = new SessionWithInitializedTablesFactory(clusterConfiguration, cluster, module).get();
+            session = new TestingSession(
+                new SessionWithInitializedTablesFactory(clusterConfiguration, cluster, module).get());
             typesProvider = new CassandraTypesProvider(module, session);
         } catch (Exception exception) {
             throw new RuntimeException(exception);
         }
     }
 
-    public Session getConf() {
+    public TestingSession getConf() {
         return session;
     }
 
@@ -82,6 +82,7 @@ public final class CassandraCluster implements AutoCloseable {
 
     @Override
     public void close() {
+        session.resetExecutionHook();
         if (!cluster.isClosed()) {
             clearTables();
             closeCluster();
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java
new file mode 100644
index 0000000..5e0ad9d
--- /dev/null
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java
@@ -0,0 +1,245 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.CloseFuture;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.RegularStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class TestingSession implements Session {
+    enum Behavior {
+        THROW((session, statement) -> {
+            RuntimeException injected_failure = new RuntimeException("Injected failure");
+            injected_failure.printStackTrace();
+            throw injected_failure;
+        }),
+        EXECUTE_NORMALLY(Session::executeAsync);
+
+        private final BiFunction<Session, Statement, ResultSetFuture> behaviour;
+
+        Behavior(BiFunction<Session, Statement, ResultSetFuture> behaviour) {
+            this.behaviour = behaviour;
+        }
+
+        ResultSetFuture execute(Session session, Statement statement) {
+            return behaviour.apply(session, statement);
+        }
+    }
+
+    @FunctionalInterface
+    interface StatementPredicate extends Predicate<Statement> {
+
+    }
+
+    static class BoundStatementStartingWith implements StatementPredicate {
+        private final String queryStringPrefix;
+
+        BoundStatementStartingWith(String queryStringPrefix) {
+            this.queryStringPrefix = queryStringPrefix;
+        }
+
+        @Override
+        public boolean test(Statement statement) {
+            if (statement instanceof BoundStatement) {
+                BoundStatement boundStatement = (BoundStatement) statement;
+                return boundStatement.preparedStatement()
+                    .getQueryString()
+                    .startsWith(queryStringPrefix);
+            }
+            return false;
+        }
+    }
+
+    @FunctionalInterface
+    public interface RequiresCondition {
+        RequiresApplyCount condition(StatementPredicate statementPredicate);
+
+        default RequiresApplyCount always() {
+            return condition(ALL_STATEMENTS);
+        }
+
+        default RequiresApplyCount whenBoundStatementStartsWith(String queryStringPrefix) {
+            return condition(new BoundStatementStartingWith(queryStringPrefix));
+        }
+    }
+
+    @FunctionalInterface
+    public interface RequiresApplyCount {
+        FinalStage times(int applyCount);
+    }
+
+    @FunctionalInterface
+    public interface FinalStage {
+        void setExecutionHook();
+    }
+
+    private static class ExecutionHook {
+        final StatementPredicate statementPredicate;
+        final Behavior behavior;
+        final AtomicInteger remaining;
+
+        private ExecutionHook(StatementPredicate statementPredicate, Behavior behavior, int applyCount) {
+            this.statementPredicate = statementPredicate;
+            this.behavior = behavior;
+            this.remaining = new AtomicInteger(applyCount);
+        }
+
+        ResultSetFuture execute(Session session, Statement statement) {
+            if (statementPredicate.test(statement)) {
+                int hookPosition = remaining.getAndDecrement();
+                if (hookPosition > 0) {
+                    return behavior.execute(session, statement);
+                }
+            }
+            return Behavior.EXECUTE_NORMALLY.execute(session, statement);
+        }
+    }
+
+    private static StatementPredicate ALL_STATEMENTS = statement -> true;
+    private static ExecutionHook NO_EXECUTION_HOOK = new ExecutionHook(ALL_STATEMENTS, Behavior.EXECUTE_NORMALLY, 0);
+
+    private final Session delegate;
+    private volatile ExecutionHook executionHook;
+
+    TestingSession(Session delegate) {
+        this.delegate = delegate;
+        this.executionHook = NO_EXECUTION_HOOK;
+    }
+
+    public RequiresCondition fail() {
+        return condition -> applyCount -> () -> executionHook = new ExecutionHook(condition, Behavior.THROW, applyCount);
+    }
+
+    public void resetExecutionHook() {
+        executionHook = NO_EXECUTION_HOOK;
+    }
+
+    @Override
+    public String getLoggedKeyspace() {
+        return delegate.getLoggedKeyspace();
+    }
+
+    @Override
+    public Session init() {
+        return delegate.init();
+    }
+
+    @Override
+    public ListenableFuture<Session> initAsync() {
+        return delegate.initAsync();
+    }
+
+    @Override
+    public ResultSet execute(String query) {
+        return delegate.execute(query);
+    }
+
+    @Override
+    public ResultSet execute(String query, Object... values) {
+        return delegate.execute(query, values);
+    }
+
+    @Override
+    public ResultSet execute(String query, Map<String, Object> values) {
+        return delegate.execute(query, values);
+    }
+
+    @Override
+    public ResultSet execute(Statement statement) {
+        return delegate.execute(statement);
+    }
+
+    @Override
+    public ResultSetFuture executeAsync(String query) {
+        return delegate.executeAsync(query);
+    }
+
+    @Override
+    public ResultSetFuture executeAsync(String query, Object... values) {
+        return delegate.executeAsync(query, values);
+    }
+
+    @Override
+    public ResultSetFuture executeAsync(String query, Map<String, Object> values) {
+        return delegate.executeAsync(query, values);
+    }
+
+    @Override
+    public ResultSetFuture executeAsync(Statement statement) {
+        return executionHook.execute(delegate, statement);
+    }
+
+    @Override
+    public PreparedStatement prepare(String query) {
+        return delegate.prepare(query);
+    }
+
+    @Override
+    public PreparedStatement prepare(RegularStatement statement) {
+        return delegate.prepare(statement);
+    }
+
+    @Override
+    public ListenableFuture<PreparedStatement> prepareAsync(String query) {
+        return delegate.prepareAsync(query);
+    }
+
+    @Override
+    public ListenableFuture<PreparedStatement> prepareAsync(RegularStatement statement) {
+        return delegate.prepareAsync(statement);
+    }
+
+    @Override
+    public CloseFuture closeAsync() {
+        return delegate.closeAsync();
+    }
+
+    @Override
+    public void close() {
+        delegate.close();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return delegate.isClosed();
+    }
+
+    @Override
+    public Cluster getCluster() {
+        return delegate.getCluster();
+    }
+
+    @Override
+    public State getState() {
+        return delegate.getState();
+    }
+}
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
new file mode 100644
index 0000000..1983e99
--- /dev/null
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
@@ -0,0 +1,161 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.backends.cassandra.versions.SchemaVersion;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class TestingSessionTest {
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraSchemaVersionModule.MODULE);
+
+    private CassandraSchemaVersionDAO dao;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        dao = new CassandraSchemaVersionDAO(cassandra.getConf());
+    }
+
+    @Test
+    void daoOperationShouldNotBeInstrumentedByDefault() {
+        assertThatCode(() -> dao.getCurrentSchemaVersion().block())
+            .doesNotThrowAnyException();
+    }
+
+    @Test
+    void daoOperationShouldNotBeInstrumentedWhenNotMatching(CassandraCluster cassandra) {
+        cassandra.getConf()
+            .fail()
+            .whenBoundStatementStartsWith("non matching")
+            .times(1)
+            .setExecutionHook();
+
+        assertThatCode(() -> dao.getCurrentSchemaVersion().block())
+            .doesNotThrowAnyException();
+    }
+
+    @Test
+    void daoOperationShouldNotBeInstrumentedWhenTimesIsZero(CassandraCluster cassandra) {
+        cassandra.getConf()
+            .fail()
+            .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;")
+            .times(0)
+            .setExecutionHook();
+
+        assertThatCode(() -> dao.getCurrentSchemaVersion().block())
+            .doesNotThrowAnyException();
+    }
+
+    @Test
+    void daoOperationShouldNotBeInstrumentedWhenTimesIsNegative(CassandraCluster cassandra) {
+        cassandra.getConf()
+            .fail()
+            .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;")
+            .times(-1)
+            .setExecutionHook();
+
+        assertThatCode(() -> dao.getCurrentSchemaVersion().block())
+            .doesNotThrowAnyException();
+    }
+
+    @Test
+    void daoOperationShouldFailWhenInstrumented(CassandraCluster cassandra) {
+        cassandra.getConf()
+            .fail()
+            .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;")
+            .times(1)
+            .setExecutionHook();
+
+        assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
+            .isInstanceOf(RuntimeException.class);
+    }
+
+    @Test
+    void daoShouldNotBeInstrumentedWhenTimesIsExceeded(CassandraCluster cassandra) {
+        cassandra.getConf()
+            .fail()
+            .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;")
+            .times(1)
+            .setExecutionHook();
+
+        try {
+            dao.getCurrentSchemaVersion().block();
+        } catch (Exception e) {
+            // discard expected exception
+        }
+
+        assertThatCode(() -> dao.getCurrentSchemaVersion().block())
+            .doesNotThrowAnyException();
+    }
+
+    @Test
+    void timesShouldSpecifyExactlyTheFailureCount(CassandraCluster cassandra) {
+        cassandra.getConf()
+            .fail()
+            .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;")
+            .times(2)
+            .setExecutionHook();
+
+        SoftAssertions.assertSoftly(softly -> {
+            assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
+                .isInstanceOf(RuntimeException.class);
+            assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
+                .isInstanceOf(RuntimeException.class);
+            assertThatCode(() -> dao.getCurrentSchemaVersion().block())
+                .doesNotThrowAnyException();
+        });
+    }
+
+    @Test
+    void resetExecutionHookShouldClearInstrumentation(CassandraCluster cassandra) {
+        cassandra.getConf()
+            .fail()
+            .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;")
+            .times(1)
+            .setExecutionHook();
+
+        cassandra.getConf().resetExecutionHook();
+
+        assertThatCode(() -> dao.getCurrentSchemaVersion().block())
+            .doesNotThrowAnyException();
+    }
+
+    @Test
+    void timesShouldBeTakenIntoAccountOnlyForMatchingStatements(CassandraCluster cassandra) {
+        cassandra.getConf()
+            .fail()
+            .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;")
+            .times(1)
+            .setExecutionHook();
+
+        dao.updateVersion(new SchemaVersion(36)).block();
+
+        assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
+            .isInstanceOf(RuntimeException.class);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org