You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/18 11:06:57 UTC

[1/3] james-project git commit: JAMES-2645 Refactor cassandra RRTs by adding a proper DAO class with basic tests

Repository: james-project
Updated Branches:
  refs/heads/master 07bc44463 -> a760297cc


JAMES-2645 Refactor cassandra RRTs by adding a proper DAO class with basic tests


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/c216bce6
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/c216bce6
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/c216bce6

Branch: refs/heads/master
Commit: c216bce6a30037fc7115068599dc15c89a400a07
Parents: 07bc444
Author: Rene Cordier <rc...@linagora.com>
Authored: Thu Jan 17 11:40:38 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 18 18:01:20 2019 +0700

----------------------------------------------------------------------
 .../CassandraRecipientRewriteTable.java         | 128 ++--------------
 .../CassandraRecipientRewriteTableDAO.java      | 150 +++++++++++++++++++
 .../CassandraRecipientRewriteTableDAOTest.java  | 100 +++++++++++++
 .../CassandraRecipientRewriteTableTest.java     |   3 +-
 .../james/rrt/cassandra/CassandraStepdefs.java  |   3 +-
 5 files changed, 264 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
index 332ec43..27bc7d4 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
@@ -18,24 +18,10 @@
  ****************************************************************/
 package org.apache.james.rrt.cassandra;
 
-import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
-import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.DOMAIN;
-import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.MAPPING;
-import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.TABLE_NAME;
-import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.USER;
-
-import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 import javax.inject.Inject;
 
-import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.core.Domain;
 import org.apache.james.rrt.lib.AbstractRecipientRewriteTable;
 import org.apache.james.rrt.lib.Mapping;
@@ -44,135 +30,41 @@ import org.apache.james.rrt.lib.Mappings;
 import org.apache.james.rrt.lib.MappingsImpl;
 import org.apache.james.util.OptionalUtils;
 
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Session;
-import com.github.steveash.guavate.Guavate;
-
 public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTable {
-
-    private final CassandraAsyncExecutor executor;
-    private final CassandraUtils cassandraUtils;
-    private final PreparedStatement insertStatement;
-    private final PreparedStatement deleteStatement;
-    private final PreparedStatement retrieveMappingStatement;
-    private final PreparedStatement retrieveAllMappingsStatement;
+    private final CassandraRecipientRewriteTableDAO dao;
 
     @Inject
-    public CassandraRecipientRewriteTable(Session session, CassandraUtils cassandraUtils) {
-        this.executor = new CassandraAsyncExecutor(session);
-        this.cassandraUtils = cassandraUtils;
-        this.insertStatement = prepareInsertStatement(session);
-        this.deleteStatement = prepareDelete(session);
-        this.retrieveMappingStatement = prepareRetrieveMappingStatement(session);
-        this.retrieveAllMappingsStatement = prepareRetrieveAllMappingStatement(session);
-    }
-
-    private PreparedStatement prepareRetrieveAllMappingStatement(Session session) {
-        return session.prepare(select(USER, DOMAIN, MAPPING)
-            .from(TABLE_NAME));
-    }
-
-    private PreparedStatement prepareRetrieveMappingStatement(Session session) {
-        return session.prepare(select(MAPPING)
-            .from(TABLE_NAME)
-            .where(eq(USER, bindMarker(USER)))
-            .and(eq(DOMAIN, bindMarker(DOMAIN))));
-    }
-
-    private PreparedStatement prepareDelete(Session session) {
-        return session.prepare(delete()
-            .from(TABLE_NAME)
-            .where(eq(USER, bindMarker(USER)))
-            .and(eq(DOMAIN, bindMarker(DOMAIN)))
-            .and(eq(MAPPING, bindMarker(MAPPING))));
-    }
-
-    private PreparedStatement prepareInsertStatement(Session session) {
-        return session.prepare(insertInto(TABLE_NAME)
-            .value(USER, bindMarker(USER))
-            .value(DOMAIN, bindMarker(DOMAIN))
-            .value(MAPPING, bindMarker(MAPPING)));
+    public CassandraRecipientRewriteTable(CassandraRecipientRewriteTableDAO dao) {
+        this.dao = dao;
     }
 
     @Override
     public void addMapping(MappingSource source, Mapping mapping) {
-        executor.executeVoid(insertStatement.bind()
-            .setString(USER, source.getFixedUser())
-            .setString(DOMAIN, source.getFixedDomain())
-            .setString(MAPPING, mapping.asString()))
-            .join();
+        dao.addMapping(source, mapping).block();
     }
 
     @Override
     public void removeMapping(MappingSource source, Mapping mapping) {
-        executor.executeVoid(deleteStatement.bind()
-                .setString(USER, source.getFixedUser())
-                .setString(DOMAIN, source.getFixedDomain())
-                .setString(MAPPING, mapping.asString()))
-            .join();
+        dao.removeMapping(source, mapping).block();
     }
 
     @Override
     public Mappings getStoredMappings(MappingSource source) {
-        return retrieveMappings(source)
+        return dao.retrieveMappings(source)
+            .blockOptional()
             .orElse(MappingsImpl.empty());
     }
 
-    private Optional<Mappings> retrieveMappings(MappingSource source) {
-        List<String> mappings = executor.execute(retrieveMappingStatement.bind()
-            .setString(USER, source.getFixedUser())
-            .setString(DOMAIN, source.getFixedDomain()))
-            .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
-                .map(row -> row.getString(MAPPING))
-                .collect(Guavate.toImmutableList()))
-            .join();
-
-        return MappingsImpl.fromCollection(mappings).toOptional();
-    }
-
     @Override
     public Map<MappingSource, Mappings> getAllMappings() {
-        return executor.execute(retrieveAllMappingsStatement.bind())
-            .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
-                .map(row -> new UserMapping(MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)), row.getString(MAPPING)))
-                .collect(Guavate.toImmutableMap(
-                    UserMapping::getSource,
-                    UserMapping::toMapping,
-                    Mappings::union)))
-            .join();
-    }
-
-    private static class UserMapping {
-
-        private final MappingSource source;
-        private final String mapping;
-
-        public UserMapping(MappingSource source, String mapping) {
-            this.source = source;
-            this.mapping = mapping;
-        }
-
-
-        public MappingSource getSource() {
-            return source;
-        }
-
-        public String getMapping() {
-            return mapping;
-        }
-
-        public Mappings toMapping() {
-            return MappingsImpl.fromRawString(getMapping());
-        }
-
+        return dao.getAllMappings().block();
     }
 
     @Override
     protected Mappings mapAddress(String user, Domain domain) {
         return OptionalUtils.orSuppliers(
-            () -> retrieveMappings(MappingSource.fromUser(user, domain)),
-            () -> retrieveMappings(MappingSource.fromDomain(domain)))
+            () -> dao.retrieveMappings(MappingSource.fromUser(user, domain)).blockOptional(),
+            () -> dao.retrieveMappings(MappingSource.fromDomain(domain)).blockOptional())
                 .orElse(MappingsImpl.empty());
     }
-
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
new file mode 100644
index 0000000..52f3516
--- /dev/null
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
@@ -0,0 +1,150 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rrt.cassandra;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.DOMAIN;
+import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.MAPPING;
+import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.TABLE_NAME;
+import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.USER;
+
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.rrt.lib.Mapping;
+import org.apache.james.rrt.lib.MappingSource;
+import org.apache.james.rrt.lib.Mappings;
+import org.apache.james.rrt.lib.MappingsImpl;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.github.steveash.guavate.Guavate;
+
+import reactor.core.publisher.Mono;
+
+class CassandraRecipientRewriteTableDAO {
+    private final CassandraAsyncExecutor executor;
+    private final CassandraUtils cassandraUtils;
+    private final PreparedStatement insertStatement;
+    private final PreparedStatement deleteStatement;
+    private final PreparedStatement retrieveMappingStatement;
+    private final PreparedStatement retrieveAllMappingsStatement;
+
+    @Inject
+    CassandraRecipientRewriteTableDAO(Session session, CassandraUtils cassandraUtils) {
+        this.executor = new CassandraAsyncExecutor(session);
+        this.cassandraUtils = cassandraUtils;
+        this.insertStatement = prepareInsertStatement(session);
+        this.deleteStatement = prepareDelete(session);
+        this.retrieveMappingStatement = prepareRetrieveMappingStatement(session);
+        this.retrieveAllMappingsStatement = prepareRetrieveAllMappingStatement(session);
+    }
+
+    private PreparedStatement prepareRetrieveAllMappingStatement(Session session) {
+        return session.prepare(select(USER, DOMAIN, MAPPING)
+            .from(TABLE_NAME));
+    }
+
+    private PreparedStatement prepareRetrieveMappingStatement(Session session) {
+        return session.prepare(select(MAPPING)
+            .from(TABLE_NAME)
+            .where(eq(USER, bindMarker(USER)))
+            .and(eq(DOMAIN, bindMarker(DOMAIN))));
+    }
+
+    private PreparedStatement prepareDelete(Session session) {
+        return session.prepare(delete()
+            .from(TABLE_NAME)
+            .where(eq(USER, bindMarker(USER)))
+            .and(eq(DOMAIN, bindMarker(DOMAIN)))
+            .and(eq(MAPPING, bindMarker(MAPPING))));
+    }
+
+    private PreparedStatement prepareInsertStatement(Session session) {
+        return session.prepare(insertInto(TABLE_NAME)
+            .value(USER, bindMarker(USER))
+            .value(DOMAIN, bindMarker(DOMAIN))
+            .value(MAPPING, bindMarker(MAPPING)));
+    }
+
+    Mono<Void> addMapping(MappingSource source, Mapping mapping) {
+        return executor.executeVoidReactor(insertStatement.bind()
+            .setString(USER, source.getFixedUser())
+            .setString(DOMAIN, source.getFixedDomain())
+            .setString(MAPPING, mapping.asString()));
+    }
+
+    Mono<Void> removeMapping(MappingSource source, Mapping mapping) {
+        return executor.executeVoidReactor(deleteStatement.bind()
+            .setString(USER, source.getFixedUser())
+            .setString(DOMAIN, source.getFixedDomain())
+            .setString(MAPPING, mapping.asString()));
+    }
+
+    Mono<MappingsImpl> retrieveMappings(MappingSource source) {
+        return executor.executeReactor(retrieveMappingStatement.bind()
+            .setString(USER, source.getFixedUser())
+            .setString(DOMAIN, source.getFixedDomain()))
+            .map(resultSet -> cassandraUtils.convertToStream(resultSet)
+                .map(row -> row.getString(MAPPING))
+                .collect(Guavate.toImmutableList()))
+            .map(MappingsImpl::fromCollection)
+            .filter(mappings -> !mappings.isEmpty());
+    }
+
+    Mono<Map<MappingSource, Mappings>> getAllMappings() {
+        return executor.executeReactor(retrieveAllMappingsStatement.bind())
+            .map(resultSet -> cassandraUtils.convertToStream(resultSet)
+                .map(row -> new UserMapping(MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)), row.getString(MAPPING)))
+                .collect(Guavate.toImmutableMap(
+                    UserMapping::getSource,
+                    UserMapping::toMapping,
+                    Mappings::union)));
+    }
+
+    private static class UserMapping {
+        private final MappingSource source;
+        private final String mapping;
+
+        UserMapping(MappingSource source, String mapping) {
+            this.source = source;
+            this.mapping = mapping;
+        }
+
+        MappingSource getSource() {
+            return source;
+        }
+
+        String getMapping() {
+            return mapping;
+        }
+
+        Mappings toMapping() {
+            return MappingsImpl.fromRawString(getMapping());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
new file mode 100644
index 0000000..0d2f096
--- /dev/null
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
@@ -0,0 +1,100 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rrt.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.core.Domain;
+import org.apache.james.rrt.lib.Mapping;
+import org.apache.james.rrt.lib.MappingSource;
+import org.apache.james.rrt.lib.MappingsImpl;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class CassandraRecipientRewriteTableDAOTest {
+    private static final String USER = "test";
+    private static final String ADDRESS = "test@domain";
+    private static final MappingSource SOURCE = MappingSource.fromUser(USER, Domain.LOCALHOST);
+    private static final Mapping MAPPING = Mapping.alias(ADDRESS);
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraRRTModule.MODULE);
+
+    private CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        cassandraRecipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+    }
+
+    @Test
+    void retrieveMappingsShouldReturnEmptyByDefault() {
+        assertThat(cassandraRecipientRewriteTableDAO.retrieveMappings(SOURCE).blockOptional())
+            .isEmpty();
+    }
+
+    @Test
+    void getAllMappingsShouldReturnEmptyByDefault() {
+        assertThat(cassandraRecipientRewriteTableDAO.getAllMappings().block())
+            .isEmpty();
+    }
+
+    @Test
+    void retrieveMappingsShouldReturnStoredMapping() {
+        cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+
+        assertThat(cassandraRecipientRewriteTableDAO.retrieveMappings(SOURCE).blockOptional())
+            .contains(MappingsImpl.fromMappings(MAPPING));
+    }
+
+    @Test
+    void getAllMappingsShouldReturnStoredMapping() {
+        cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+
+        assertThat(cassandraRecipientRewriteTableDAO.getAllMappings().block())
+            .contains(Pair.of(SOURCE, MappingsImpl.fromMappings(MAPPING)));
+    }
+
+    @Test
+    void retrieveMappingsShouldNotReturnRemovedMapping() {
+        cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+
+        cassandraRecipientRewriteTableDAO.removeMapping(SOURCE, MAPPING).block();
+
+        assertThat(cassandraRecipientRewriteTableDAO.retrieveMappings(SOURCE).blockOptional())
+            .isEmpty();
+    }
+
+    @Test
+    void getAllMappingsShouldNotReturnRemovedMapping() {
+        cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+
+        cassandraRecipientRewriteTableDAO.removeMapping(SOURCE, MAPPING).block();
+
+        assertThat(cassandraRecipientRewriteTableDAO.getAllMappings().block())
+            .isEmpty();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
index c7d6d6f..d0bc05c 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
@@ -61,7 +61,8 @@ public class CassandraRecipientRewriteTableTest extends AbstractRecipientRewrite
 
     @Override
     protected AbstractRecipientRewriteTable getRecipientRewriteTable() throws Exception {
-        CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable(
+            new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION));
         rrt.configure(new DefaultConfigurationBuilder());
         return rrt;
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
index 3c2916c..edec0ca 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
@@ -49,7 +49,8 @@ public class CassandraStepdefs {
     }
 
     private AbstractRecipientRewriteTable getRecipientRewriteTable() throws Exception {
-        CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable(
+            new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION));
         rrt.configure(new DefaultConfigurationBuilder());
         return rrt;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[3/3] james-project git commit: MAILBOX-372 Test dispatching from the listeners

Posted by bt...@apache.org.
MAILBOX-372 Test dispatching from the listeners

This behaviour corresponds to ListeningCurrentQuotaUpdater

We demonstrate here that InVMEventDelivery do not support dispatch while retrying.


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/a760297c
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/a760297c
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/a760297c

Branch: refs/heads/master
Commit: a760297cc9b795336c659b7d6efce6109be384dc
Parents: b3ec659
Author: Benoit Tellier <bt...@linagora.com>
Authored: Fri Jan 18 13:03:39 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 18 18:02:32 2019 +0700

----------------------------------------------------------------------
 .../mailbox/events/ErrorHandlingContract.java   | 35 +++++++++++++++++---
 .../james/mailbox/events/GroupContract.java     | 18 ++++++++++
 .../events/delivery/InVmEventDelivery.java      |  1 +
 3 files changed, 49 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/a760297c/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
index f4c0768..c10c542 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
@@ -20,6 +20,9 @@
 package org.apache.james.mailbox.events;
 
 import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_2;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_ID;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -30,6 +33,7 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.james.mailbox.Event;
 import org.apache.james.mailbox.MailboxListener;
@@ -70,7 +74,7 @@ interface ErrorHandlingContract extends EventBusContract {
             .doCallRealMethod()
             .when(eventCollector).event(EVENT);
 
-        eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
+        eventBus().register(eventCollector, GROUP_A);
         eventBus().dispatch(EVENT, NO_KEYS).block();
 
         WAIT_CONDITION
@@ -87,7 +91,7 @@ interface ErrorHandlingContract extends EventBusContract {
             .doCallRealMethod()
             .when(eventCollector).event(EVENT);
 
-        eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
+        eventBus().register(eventCollector, GROUP_A);
         eventBus().dispatch(EVENT, NO_KEYS).block();
 
         WAIT_CONDITION
@@ -105,7 +109,7 @@ interface ErrorHandlingContract extends EventBusContract {
             .doCallRealMethod()
             .when(eventCollector).event(EVENT);
 
-        eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
+        eventBus().register(eventCollector, GROUP_A);
         eventBus().dispatch(EVENT, NO_KEYS).block();
 
         TimeUnit.SECONDS.sleep(1);
@@ -117,7 +121,7 @@ interface ErrorHandlingContract extends EventBusContract {
     default void exceedingMaxRetriesShouldStopConsumingFailedEvent() throws Exception {
         ThrowingListener throwingListener = throwingListener();
 
-        eventBus().register(throwingListener, new EventBusTestFixture.GroupA());
+        eventBus().register(throwingListener, GROUP_A);
         eventBus().dispatch(EVENT, NO_KEYS).block();
 
         TimeUnit.SECONDS.sleep(5);
@@ -132,7 +136,7 @@ interface ErrorHandlingContract extends EventBusContract {
     default void retriesBackOffShouldDelayByExponentialGrowth() throws Exception {
         ThrowingListener throwingListener = throwingListener();
 
-        eventBus().register(throwingListener, new EventBusTestFixture.GroupA());
+        eventBus().register(throwingListener, GROUP_A);
         eventBus().dispatch(EVENT, NO_KEYS).block();
 
         TimeUnit.SECONDS.sleep(5);
@@ -154,4 +158,25 @@ interface ErrorHandlingContract extends EventBusContract {
                 .isAfterOrEqualTo(timeElapsed.get(2).plusMillis(minThirdDelayAfter));
         });
     }
+
+    @Test
+    default void retryingListenerCallingDispatchShouldNotFail() {
+        AtomicBoolean firstExecution = new AtomicBoolean(true);
+        AtomicBoolean successfulRetry = new AtomicBoolean(false);
+        MailboxListener listener = event -> {
+            if (event.getEventId().equals(EVENT_ID)) {
+                if (firstExecution.get()) {
+                    firstExecution.set(false);
+                    throw new RuntimeException();
+                }
+                eventBus().dispatch(EVENT_2, NO_KEYS).block();
+                successfulRetry.set(true);
+            }
+        };
+
+        eventBus().register(listener, GROUP_A);
+        eventBus().dispatch(EVENT, NO_KEYS).block();
+
+        WAIT_CONDITION.until(successfulRetry::get);
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/a760297c/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index 870f75f..93194e4 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -21,6 +21,7 @@ package org.apache.james.mailbox.events;
 
 import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
 import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_2;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_ID;
 import static org.apache.james.mailbox.events.EventBusTestFixture.FIVE_HUNDRED_MS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
 import static org.apache.james.mailbox.events.EventBusTestFixture.GroupB;
@@ -39,6 +40,8 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.james.core.User;
 import org.apache.james.mailbox.Event;
 import org.apache.james.mailbox.MailboxListener;
@@ -53,6 +56,21 @@ import com.google.common.collect.ImmutableSortedMap;
 public interface GroupContract {
 
     interface SingleEventBusGroupContract extends EventBusContract {
+        @Test
+        default void listenersShouldBeAbleToDispatch() {
+            AtomicBoolean successfulRetry = new AtomicBoolean(false);
+            MailboxListener listener = event -> {
+                if (event.getEventId().equals(EVENT_ID)) {
+                    eventBus().dispatch(EVENT_2, NO_KEYS).block();
+                    successfulRetry.set(true);
+                }
+            };
+
+            eventBus().register(listener, GROUP_A);
+            eventBus().dispatch(EVENT, NO_KEYS).block();
+
+            WAIT_CONDITION.until(successfulRetry::get);
+        }
 
         @Test
         default void listenerGroupShouldReceiveEvents() throws Exception {

http://git-wip-us.apache.org/repos/asf/james-project/blob/a760297c/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
index c5446dd..e7eb233 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
@@ -82,6 +82,7 @@ public class InVmEventDelivery implements EventDelivery {
                 listenerName(mailboxListener),
                 eventName(event),
                 throwable))
+            .subscribeOn(Schedulers.elastic())
             .retryBackoff(MAX_RETRIES, FIRST_BACKOFF, MAX_BACKOFF, DEFAULT_JITTER_FACTOR)
             .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
                 listenerName(mailboxListener),


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[2/3] james-project git commit: MAILBOX-372 RabbitMQEventBus should receive events didn't fully processed

Posted by bt...@apache.org.
MAILBOX-372 RabbitMQEventBus should receive events didn't fully processed


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b3ec659c
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b3ec659c
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b3ec659c

Branch: refs/heads/master
Commit: b3ec659cf6485be441be81fe515f6cbe4919d5a1
Parents: c216bce
Author: tran tien duc <dt...@linagora.com>
Authored: Thu Jan 17 15:47:02 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 18 18:02:08 2019 +0700

----------------------------------------------------------------------
 .../mailbox/events/RabbitMQEventBusTest.java    | 40 ++++++++++++++++++++
 1 file changed, 40 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/b3ec659c/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index ffcdac8..feaf712 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -32,12 +32,16 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
 import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
 import static org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution;
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
 import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
@@ -63,6 +67,7 @@ import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.BeforeEachCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.stubbing.Answer;
 
 import com.rabbitmq.client.Connection;
 
@@ -200,6 +205,41 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
     }
 
     @Nested
+    class AtLeastOnceTest {
+
+        @Test
+        void inProcessingEventShouldBeReDispatchedToAnotherEventBusWhenOneIsDown() {
+            MailboxListenerCountingSuccessfulExecution eventBusListener = spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
+            MailboxListenerCountingSuccessfulExecution eventBus2Listener = spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
+            MailboxListenerCountingSuccessfulExecution eventBus3Listener = spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
+            Answer callEventAndSleepForever = invocation -> {
+                invocation.callRealMethod();
+                TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
+                return null;
+            };
+
+            doAnswer(callEventAndSleepForever).when(eventBusListener).event(any());
+            doAnswer(callEventAndSleepForever).when(eventBus2Listener).event(any());
+
+            eventBus.register(eventBusListener, GROUP_A);
+            eventBus2.register(eventBus2Listener, GROUP_A);
+            eventBus3.register(eventBus3Listener, GROUP_A);
+
+            eventBus.dispatch(EVENT, NO_KEYS).block();
+            WAIT_CONDITION
+                .until(() -> assertThat(eventBusListener.numberOfEventCalls()).isEqualTo(1));
+            eventBus.stop();
+
+            WAIT_CONDITION
+                .until(() -> assertThat(eventBus2Listener.numberOfEventCalls()).isEqualTo(1));
+            eventBus2.stop();
+
+            WAIT_CONDITION
+                .until(() -> assertThat(eventBus3Listener.numberOfEventCalls()).isEqualTo(1));
+        }
+    }
+
+    @Nested
     class PublishingTest {
         private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org