You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/18 11:06:57 UTC
[1/3] james-project git commit: JAMES-2645 Refactor cassandra RRTs by
adding a proper DAO class with basic tests
Repository: james-project
Updated Branches:
refs/heads/master 07bc44463 -> a760297cc
JAMES-2645 Refactor cassandra RRTs by adding a proper DAO class with basic tests
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/c216bce6
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/c216bce6
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/c216bce6
Branch: refs/heads/master
Commit: c216bce6a30037fc7115068599dc15c89a400a07
Parents: 07bc444
Author: Rene Cordier <rc...@linagora.com>
Authored: Thu Jan 17 11:40:38 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 18 18:01:20 2019 +0700
----------------------------------------------------------------------
.../CassandraRecipientRewriteTable.java | 128 ++--------------
.../CassandraRecipientRewriteTableDAO.java | 150 +++++++++++++++++++
.../CassandraRecipientRewriteTableDAOTest.java | 100 +++++++++++++
.../CassandraRecipientRewriteTableTest.java | 3 +-
.../james/rrt/cassandra/CassandraStepdefs.java | 3 +-
5 files changed, 264 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
index 332ec43..27bc7d4 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
@@ -18,24 +18,10 @@
****************************************************************/
package org.apache.james.rrt.cassandra;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
-import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.DOMAIN;
-import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.MAPPING;
-import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.TABLE_NAME;
-import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.USER;
-
-import java.util.List;
import java.util.Map;
-import java.util.Optional;
import javax.inject.Inject;
-import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
import org.apache.james.core.Domain;
import org.apache.james.rrt.lib.AbstractRecipientRewriteTable;
import org.apache.james.rrt.lib.Mapping;
@@ -44,135 +30,41 @@ import org.apache.james.rrt.lib.Mappings;
import org.apache.james.rrt.lib.MappingsImpl;
import org.apache.james.util.OptionalUtils;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Session;
-import com.github.steveash.guavate.Guavate;
-
public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTable {
-
- private final CassandraAsyncExecutor executor;
- private final CassandraUtils cassandraUtils;
- private final PreparedStatement insertStatement;
- private final PreparedStatement deleteStatement;
- private final PreparedStatement retrieveMappingStatement;
- private final PreparedStatement retrieveAllMappingsStatement;
+ private final CassandraRecipientRewriteTableDAO dao;
@Inject
- public CassandraRecipientRewriteTable(Session session, CassandraUtils cassandraUtils) {
- this.executor = new CassandraAsyncExecutor(session);
- this.cassandraUtils = cassandraUtils;
- this.insertStatement = prepareInsertStatement(session);
- this.deleteStatement = prepareDelete(session);
- this.retrieveMappingStatement = prepareRetrieveMappingStatement(session);
- this.retrieveAllMappingsStatement = prepareRetrieveAllMappingStatement(session);
- }
-
- private PreparedStatement prepareRetrieveAllMappingStatement(Session session) {
- return session.prepare(select(USER, DOMAIN, MAPPING)
- .from(TABLE_NAME));
- }
-
- private PreparedStatement prepareRetrieveMappingStatement(Session session) {
- return session.prepare(select(MAPPING)
- .from(TABLE_NAME)
- .where(eq(USER, bindMarker(USER)))
- .and(eq(DOMAIN, bindMarker(DOMAIN))));
- }
-
- private PreparedStatement prepareDelete(Session session) {
- return session.prepare(delete()
- .from(TABLE_NAME)
- .where(eq(USER, bindMarker(USER)))
- .and(eq(DOMAIN, bindMarker(DOMAIN)))
- .and(eq(MAPPING, bindMarker(MAPPING))));
- }
-
- private PreparedStatement prepareInsertStatement(Session session) {
- return session.prepare(insertInto(TABLE_NAME)
- .value(USER, bindMarker(USER))
- .value(DOMAIN, bindMarker(DOMAIN))
- .value(MAPPING, bindMarker(MAPPING)));
+ public CassandraRecipientRewriteTable(CassandraRecipientRewriteTableDAO dao) {
+ this.dao = dao;
}
@Override
public void addMapping(MappingSource source, Mapping mapping) {
- executor.executeVoid(insertStatement.bind()
- .setString(USER, source.getFixedUser())
- .setString(DOMAIN, source.getFixedDomain())
- .setString(MAPPING, mapping.asString()))
- .join();
+ dao.addMapping(source, mapping).block();
}
@Override
public void removeMapping(MappingSource source, Mapping mapping) {
- executor.executeVoid(deleteStatement.bind()
- .setString(USER, source.getFixedUser())
- .setString(DOMAIN, source.getFixedDomain())
- .setString(MAPPING, mapping.asString()))
- .join();
+ dao.removeMapping(source, mapping).block();
}
@Override
public Mappings getStoredMappings(MappingSource source) {
- return retrieveMappings(source)
+ return dao.retrieveMappings(source)
+ .blockOptional()
.orElse(MappingsImpl.empty());
}
- private Optional<Mappings> retrieveMappings(MappingSource source) {
- List<String> mappings = executor.execute(retrieveMappingStatement.bind()
- .setString(USER, source.getFixedUser())
- .setString(DOMAIN, source.getFixedDomain()))
- .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
- .map(row -> row.getString(MAPPING))
- .collect(Guavate.toImmutableList()))
- .join();
-
- return MappingsImpl.fromCollection(mappings).toOptional();
- }
-
@Override
public Map<MappingSource, Mappings> getAllMappings() {
- return executor.execute(retrieveAllMappingsStatement.bind())
- .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
- .map(row -> new UserMapping(MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)), row.getString(MAPPING)))
- .collect(Guavate.toImmutableMap(
- UserMapping::getSource,
- UserMapping::toMapping,
- Mappings::union)))
- .join();
- }
-
- private static class UserMapping {
-
- private final MappingSource source;
- private final String mapping;
-
- public UserMapping(MappingSource source, String mapping) {
- this.source = source;
- this.mapping = mapping;
- }
-
-
- public MappingSource getSource() {
- return source;
- }
-
- public String getMapping() {
- return mapping;
- }
-
- public Mappings toMapping() {
- return MappingsImpl.fromRawString(getMapping());
- }
-
+ return dao.getAllMappings().block();
}
@Override
protected Mappings mapAddress(String user, Domain domain) {
return OptionalUtils.orSuppliers(
- () -> retrieveMappings(MappingSource.fromUser(user, domain)),
- () -> retrieveMappings(MappingSource.fromDomain(domain)))
+ () -> dao.retrieveMappings(MappingSource.fromUser(user, domain)).blockOptional(),
+ () -> dao.retrieveMappings(MappingSource.fromDomain(domain)).blockOptional())
.orElse(MappingsImpl.empty());
}
-
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
new file mode 100644
index 0000000..52f3516
--- /dev/null
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
@@ -0,0 +1,150 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.rrt.cassandra;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.DOMAIN;
+import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.MAPPING;
+import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.TABLE_NAME;
+import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.USER;
+
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.rrt.lib.Mapping;
+import org.apache.james.rrt.lib.MappingSource;
+import org.apache.james.rrt.lib.Mappings;
+import org.apache.james.rrt.lib.MappingsImpl;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.github.steveash.guavate.Guavate;
+
+import reactor.core.publisher.Mono;
+
+class CassandraRecipientRewriteTableDAO {
+ private final CassandraAsyncExecutor executor;
+ private final CassandraUtils cassandraUtils;
+ private final PreparedStatement insertStatement;
+ private final PreparedStatement deleteStatement;
+ private final PreparedStatement retrieveMappingStatement;
+ private final PreparedStatement retrieveAllMappingsStatement;
+
+ @Inject
+ CassandraRecipientRewriteTableDAO(Session session, CassandraUtils cassandraUtils) {
+ this.executor = new CassandraAsyncExecutor(session);
+ this.cassandraUtils = cassandraUtils;
+ this.insertStatement = prepareInsertStatement(session);
+ this.deleteStatement = prepareDelete(session);
+ this.retrieveMappingStatement = prepareRetrieveMappingStatement(session);
+ this.retrieveAllMappingsStatement = prepareRetrieveAllMappingStatement(session);
+ }
+
+ private PreparedStatement prepareRetrieveAllMappingStatement(Session session) {
+ return session.prepare(select(USER, DOMAIN, MAPPING)
+ .from(TABLE_NAME));
+ }
+
+ private PreparedStatement prepareRetrieveMappingStatement(Session session) {
+ return session.prepare(select(MAPPING)
+ .from(TABLE_NAME)
+ .where(eq(USER, bindMarker(USER)))
+ .and(eq(DOMAIN, bindMarker(DOMAIN))));
+ }
+
+ private PreparedStatement prepareDelete(Session session) {
+ return session.prepare(delete()
+ .from(TABLE_NAME)
+ .where(eq(USER, bindMarker(USER)))
+ .and(eq(DOMAIN, bindMarker(DOMAIN)))
+ .and(eq(MAPPING, bindMarker(MAPPING))));
+ }
+
+ private PreparedStatement prepareInsertStatement(Session session) {
+ return session.prepare(insertInto(TABLE_NAME)
+ .value(USER, bindMarker(USER))
+ .value(DOMAIN, bindMarker(DOMAIN))
+ .value(MAPPING, bindMarker(MAPPING)));
+ }
+
+ Mono<Void> addMapping(MappingSource source, Mapping mapping) {
+ return executor.executeVoidReactor(insertStatement.bind()
+ .setString(USER, source.getFixedUser())
+ .setString(DOMAIN, source.getFixedDomain())
+ .setString(MAPPING, mapping.asString()));
+ }
+
+ Mono<Void> removeMapping(MappingSource source, Mapping mapping) {
+ return executor.executeVoidReactor(deleteStatement.bind()
+ .setString(USER, source.getFixedUser())
+ .setString(DOMAIN, source.getFixedDomain())
+ .setString(MAPPING, mapping.asString()));
+ }
+
+ Mono<MappingsImpl> retrieveMappings(MappingSource source) {
+ return executor.executeReactor(retrieveMappingStatement.bind()
+ .setString(USER, source.getFixedUser())
+ .setString(DOMAIN, source.getFixedDomain()))
+ .map(resultSet -> cassandraUtils.convertToStream(resultSet)
+ .map(row -> row.getString(MAPPING))
+ .collect(Guavate.toImmutableList()))
+ .map(MappingsImpl::fromCollection)
+ .filter(mappings -> !mappings.isEmpty());
+ }
+
+ Mono<Map<MappingSource, Mappings>> getAllMappings() {
+ return executor.executeReactor(retrieveAllMappingsStatement.bind())
+ .map(resultSet -> cassandraUtils.convertToStream(resultSet)
+ .map(row -> new UserMapping(MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)), row.getString(MAPPING)))
+ .collect(Guavate.toImmutableMap(
+ UserMapping::getSource,
+ UserMapping::toMapping,
+ Mappings::union)));
+ }
+
+ private static class UserMapping {
+ private final MappingSource source;
+ private final String mapping;
+
+ UserMapping(MappingSource source, String mapping) {
+ this.source = source;
+ this.mapping = mapping;
+ }
+
+ MappingSource getSource() {
+ return source;
+ }
+
+ String getMapping() {
+ return mapping;
+ }
+
+ Mappings toMapping() {
+ return MappingsImpl.fromRawString(getMapping());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
new file mode 100644
index 0000000..0d2f096
--- /dev/null
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
@@ -0,0 +1,100 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.rrt.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.core.Domain;
+import org.apache.james.rrt.lib.Mapping;
+import org.apache.james.rrt.lib.MappingSource;
+import org.apache.james.rrt.lib.MappingsImpl;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class CassandraRecipientRewriteTableDAOTest {
+ private static final String USER = "test";
+ private static final String ADDRESS = "test@domain";
+ private static final MappingSource SOURCE = MappingSource.fromUser(USER, Domain.LOCALHOST);
+ private static final Mapping MAPPING = Mapping.alias(ADDRESS);
+
+ @RegisterExtension
+ static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraRRTModule.MODULE);
+
+ private CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
+
+ @BeforeEach
+ void setUp(CassandraCluster cassandra) {
+ cassandraRecipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+ }
+
+ @Test
+ void retrieveMappingsShouldReturnEmptyByDefault() {
+ assertThat(cassandraRecipientRewriteTableDAO.retrieveMappings(SOURCE).blockOptional())
+ .isEmpty();
+ }
+
+ @Test
+ void getAllMappingsShouldReturnEmptyByDefault() {
+ assertThat(cassandraRecipientRewriteTableDAO.getAllMappings().block())
+ .isEmpty();
+ }
+
+ @Test
+ void retrieveMappingsShouldReturnStoredMapping() {
+ cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+
+ assertThat(cassandraRecipientRewriteTableDAO.retrieveMappings(SOURCE).blockOptional())
+ .contains(MappingsImpl.fromMappings(MAPPING));
+ }
+
+ @Test
+ void getAllMappingsShouldReturnStoredMapping() {
+ cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+
+ assertThat(cassandraRecipientRewriteTableDAO.getAllMappings().block())
+ .contains(Pair.of(SOURCE, MappingsImpl.fromMappings(MAPPING)));
+ }
+
+ @Test
+ void retrieveMappingsShouldNotReturnRemovedMapping() {
+ cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+
+ cassandraRecipientRewriteTableDAO.removeMapping(SOURCE, MAPPING).block();
+
+ assertThat(cassandraRecipientRewriteTableDAO.retrieveMappings(SOURCE).blockOptional())
+ .isEmpty();
+ }
+
+ @Test
+ void getAllMappingsShouldNotReturnRemovedMapping() {
+ cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+
+ cassandraRecipientRewriteTableDAO.removeMapping(SOURCE, MAPPING).block();
+
+ assertThat(cassandraRecipientRewriteTableDAO.getAllMappings().block())
+ .isEmpty();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
index c7d6d6f..d0bc05c 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
@@ -61,7 +61,8 @@ public class CassandraRecipientRewriteTableTest extends AbstractRecipientRewrite
@Override
protected AbstractRecipientRewriteTable getRecipientRewriteTable() throws Exception {
- CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+ CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable(
+ new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION));
rrt.configure(new DefaultConfigurationBuilder());
return rrt;
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
index 3c2916c..edec0ca 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
@@ -49,7 +49,8 @@ public class CassandraStepdefs {
}
private AbstractRecipientRewriteTable getRecipientRewriteTable() throws Exception {
- CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+ CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable(
+ new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION));
rrt.configure(new DefaultConfigurationBuilder());
return rrt;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[3/3] james-project git commit: MAILBOX-372 Test dispatching from the
listeners
Posted by bt...@apache.org.
MAILBOX-372 Test dispatching from the listeners
This behaviour corresponds to ListeningCurrentQuotaUpdater
We demonstrate here that InVMEventDelivery do not support dispatch while retrying.
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/a760297c
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/a760297c
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/a760297c
Branch: refs/heads/master
Commit: a760297cc9b795336c659b7d6efce6109be384dc
Parents: b3ec659
Author: Benoit Tellier <bt...@linagora.com>
Authored: Fri Jan 18 13:03:39 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 18 18:02:32 2019 +0700
----------------------------------------------------------------------
.../mailbox/events/ErrorHandlingContract.java | 35 +++++++++++++++++---
.../james/mailbox/events/GroupContract.java | 18 ++++++++++
.../events/delivery/InVmEventDelivery.java | 1 +
3 files changed, 49 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/a760297c/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
index f4c0768..c10c542 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
@@ -20,6 +20,9 @@
package org.apache.james.mailbox.events;
import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_2;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_ID;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
import static org.assertj.core.api.Assertions.assertThat;
@@ -30,6 +33,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.james.mailbox.Event;
import org.apache.james.mailbox.MailboxListener;
@@ -70,7 +74,7 @@ interface ErrorHandlingContract extends EventBusContract {
.doCallRealMethod()
.when(eventCollector).event(EVENT);
- eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
+ eventBus().register(eventCollector, GROUP_A);
eventBus().dispatch(EVENT, NO_KEYS).block();
WAIT_CONDITION
@@ -87,7 +91,7 @@ interface ErrorHandlingContract extends EventBusContract {
.doCallRealMethod()
.when(eventCollector).event(EVENT);
- eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
+ eventBus().register(eventCollector, GROUP_A);
eventBus().dispatch(EVENT, NO_KEYS).block();
WAIT_CONDITION
@@ -105,7 +109,7 @@ interface ErrorHandlingContract extends EventBusContract {
.doCallRealMethod()
.when(eventCollector).event(EVENT);
- eventBus().register(eventCollector, new EventBusTestFixture.GroupA());
+ eventBus().register(eventCollector, GROUP_A);
eventBus().dispatch(EVENT, NO_KEYS).block();
TimeUnit.SECONDS.sleep(1);
@@ -117,7 +121,7 @@ interface ErrorHandlingContract extends EventBusContract {
default void exceedingMaxRetriesShouldStopConsumingFailedEvent() throws Exception {
ThrowingListener throwingListener = throwingListener();
- eventBus().register(throwingListener, new EventBusTestFixture.GroupA());
+ eventBus().register(throwingListener, GROUP_A);
eventBus().dispatch(EVENT, NO_KEYS).block();
TimeUnit.SECONDS.sleep(5);
@@ -132,7 +136,7 @@ interface ErrorHandlingContract extends EventBusContract {
default void retriesBackOffShouldDelayByExponentialGrowth() throws Exception {
ThrowingListener throwingListener = throwingListener();
- eventBus().register(throwingListener, new EventBusTestFixture.GroupA());
+ eventBus().register(throwingListener, GROUP_A);
eventBus().dispatch(EVENT, NO_KEYS).block();
TimeUnit.SECONDS.sleep(5);
@@ -154,4 +158,25 @@ interface ErrorHandlingContract extends EventBusContract {
.isAfterOrEqualTo(timeElapsed.get(2).plusMillis(minThirdDelayAfter));
});
}
+
+ @Test
+ default void retryingListenerCallingDispatchShouldNotFail() {
+ AtomicBoolean firstExecution = new AtomicBoolean(true);
+ AtomicBoolean successfulRetry = new AtomicBoolean(false);
+ MailboxListener listener = event -> {
+ if (event.getEventId().equals(EVENT_ID)) {
+ if (firstExecution.get()) {
+ firstExecution.set(false);
+ throw new RuntimeException();
+ }
+ eventBus().dispatch(EVENT_2, NO_KEYS).block();
+ successfulRetry.set(true);
+ }
+ };
+
+ eventBus().register(listener, GROUP_A);
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+
+ WAIT_CONDITION.until(successfulRetry::get);
+ }
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/a760297c/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index 870f75f..93194e4 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -21,6 +21,7 @@ package org.apache.james.mailbox.events;
import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_2;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_ID;
import static org.apache.james.mailbox.events.EventBusTestFixture.FIVE_HUNDRED_MS;
import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
import static org.apache.james.mailbox.events.EventBusTestFixture.GroupB;
@@ -39,6 +40,8 @@ import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.james.core.User;
import org.apache.james.mailbox.Event;
import org.apache.james.mailbox.MailboxListener;
@@ -53,6 +56,21 @@ import com.google.common.collect.ImmutableSortedMap;
public interface GroupContract {
interface SingleEventBusGroupContract extends EventBusContract {
+ @Test
+ default void listenersShouldBeAbleToDispatch() {
+ AtomicBoolean successfulRetry = new AtomicBoolean(false);
+ MailboxListener listener = event -> {
+ if (event.getEventId().equals(EVENT_ID)) {
+ eventBus().dispatch(EVENT_2, NO_KEYS).block();
+ successfulRetry.set(true);
+ }
+ };
+
+ eventBus().register(listener, GROUP_A);
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+
+ WAIT_CONDITION.until(successfulRetry::get);
+ }
@Test
default void listenerGroupShouldReceiveEvents() throws Exception {
http://git-wip-us.apache.org/repos/asf/james-project/blob/a760297c/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
index c5446dd..e7eb233 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
@@ -82,6 +82,7 @@ public class InVmEventDelivery implements EventDelivery {
listenerName(mailboxListener),
eventName(event),
throwable))
+ .subscribeOn(Schedulers.elastic())
.retryBackoff(MAX_RETRIES, FIRST_BACKOFF, MAX_BACKOFF, DEFAULT_JITTER_FACTOR)
.doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
listenerName(mailboxListener),
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[2/3] james-project git commit: MAILBOX-372 RabbitMQEventBus should
receive events didn't fully processed
Posted by bt...@apache.org.
MAILBOX-372 RabbitMQEventBus should receive events didn't fully processed
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b3ec659c
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b3ec659c
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b3ec659c
Branch: refs/heads/master
Commit: b3ec659cf6485be441be81fe515f6cbe4919d5a1
Parents: c216bce
Author: tran tien duc <dt...@linagora.com>
Authored: Thu Jan 17 15:47:02 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 18 18:02:08 2019 +0700
----------------------------------------------------------------------
.../mailbox/events/RabbitMQEventBusTest.java | 40 ++++++++++++++++++++
1 file changed, 40 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/b3ec659c/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index ffcdac8..feaf712 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -32,12 +32,16 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
import static org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution;
import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@@ -63,6 +67,7 @@ import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.stubbing.Answer;
import com.rabbitmq.client.Connection;
@@ -200,6 +205,41 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
}
@Nested
+ class AtLeastOnceTest {
+
+ @Test
+ void inProcessingEventShouldBeReDispatchedToAnotherEventBusWhenOneIsDown() {
+ MailboxListenerCountingSuccessfulExecution eventBusListener = spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
+ MailboxListenerCountingSuccessfulExecution eventBus2Listener = spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
+ MailboxListenerCountingSuccessfulExecution eventBus3Listener = spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
+ Answer callEventAndSleepForever = invocation -> {
+ invocation.callRealMethod();
+ TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
+ return null;
+ };
+
+ doAnswer(callEventAndSleepForever).when(eventBusListener).event(any());
+ doAnswer(callEventAndSleepForever).when(eventBus2Listener).event(any());
+
+ eventBus.register(eventBusListener, GROUP_A);
+ eventBus2.register(eventBus2Listener, GROUP_A);
+ eventBus3.register(eventBus3Listener, GROUP_A);
+
+ eventBus.dispatch(EVENT, NO_KEYS).block();
+ WAIT_CONDITION
+ .until(() -> assertThat(eventBusListener.numberOfEventCalls()).isEqualTo(1));
+ eventBus.stop();
+
+ WAIT_CONDITION
+ .until(() -> assertThat(eventBus2Listener.numberOfEventCalls()).isEqualTo(1));
+ eventBus2.stop();
+
+ WAIT_CONDITION
+ .until(() -> assertThat(eventBus3Listener.numberOfEventCalls()).isEqualTo(1));
+ }
+ }
+
+ @Nested
class PublishingTest {
private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org