You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/23 06:17:17 UTC
[camel] branch exchange-factory updated (70c02cf -> 6b07205)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a change to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git.
from 70c02cf CAMEL-16222: PooledExchangeFactory experiment
new 53930f2 CAMEL-16222: PooledExchangeFactory experiment
new 0516ef9 CAMEL-16222: PooledExchangeFactory experiment
new 6b07205 CAMEL-16222: PooledExchangeFactory experiment
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../camel/component/kafka/KafkaConsumerTest.java | 13 +++++++
.../mail/MailConsumerAuthenticatorTest.java | 9 +++++
.../apache/camel/pgevent/PgEventConsumerTest.java | 30 +++++++++++++--
.../component/rabbitmq/RabbitMQConsumerTest.java | 45 ++++++++++++++++++----
4 files changed, 86 insertions(+), 11 deletions(-)
[camel] 02/03: CAMEL-16222: PooledExchangeFactory experiment
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0516ef9c413bc42085d3ee489b27272d26dbf166
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Feb 23 07:14:14 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../component/rabbitmq/RabbitMQConsumerTest.java | 45 ++++++++++++++++++----
1 file changed, 38 insertions(+), 7 deletions(-)
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
index a281bc3..79d2e11 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
@@ -24,33 +24,50 @@ import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
+import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExecutorServiceManager;
import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
public class RabbitMQConsumerTest {
+ private ExtendedCamelContext ecc = Mockito.mock(ExtendedCamelContext.class);
+ private ExchangeFactory ef = Mockito.mock(ExchangeFactory.class);
private RabbitMQEndpoint endpoint = Mockito.mock(RabbitMQEndpoint.class);
private Connection conn = Mockito.mock(Connection.class);
private Processor processor = Mockito.mock(Processor.class);
private Channel channel = Mockito.mock(Channel.class);
+ private ExecutorServiceManager esm = Mockito.mock(ExecutorServiceManager.class);
@Test
public void testStoppingConsumerShutdownExecutor() throws Exception {
- RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
-
ThreadPoolExecutor e = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
+
Mockito.when(endpoint.createExecutor()).thenReturn(e);
Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
Mockito.when(endpoint.connect(any(ExecutorService.class))).thenReturn(conn);
+ Mockito.when(endpoint.getCamelContext()).thenReturn(ecc);
+ Mockito.when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+ Mockito.when(ecc.getExchangeFactory()).thenReturn(ef);
+ Mockito.when(ef.newExchangeFactory(any())).thenReturn(ef);
+ Mockito.when(ecc.getExecutorServiceManager()).thenReturn(esm);
+ Mockito.when(esm.shutdownNow(e)).then(i -> e.shutdownNow());
Mockito.when(conn.createChannel()).thenReturn(channel);
+ RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
+
consumer.doStart();
assertFalse(e.isShutdown());
@@ -60,13 +77,20 @@ public class RabbitMQConsumerTest {
@Test
public void testStoppingConsumerShutdownConnection() throws Exception {
- RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
+ ExecutorService es = Executors.newFixedThreadPool(3);
- Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3));
+ Mockito.when(endpoint.createExecutor()).thenReturn(es);
Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
Mockito.when(endpoint.connect(any(ExecutorService.class))).thenReturn(conn);
Mockito.when(conn.createChannel()).thenReturn(channel);
+ Mockito.when(endpoint.getCamelContext()).thenReturn(ecc);
+ Mockito.when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+ Mockito.when(ecc.getExchangeFactory()).thenReturn(ef);
+ Mockito.when(ef.newExchangeFactory(any())).thenReturn(ef);
+ Mockito.when(ecc.getExecutorServiceManager()).thenReturn(esm);
+ Mockito.when(esm.shutdownNow(es)).then(i -> es.shutdownNow());
+ RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
consumer.doStart();
consumer.doStop();
@@ -76,18 +100,25 @@ public class RabbitMQConsumerTest {
@Test
public void testStoppingConsumerShutdownConnectionWhenServerHasClosedChannel() throws Exception {
AlreadyClosedException alreadyClosedException = Mockito.mock(AlreadyClosedException.class);
+ ExecutorService es = Executors.newFixedThreadPool(3);
- RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
-
- Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3));
+ Mockito.when(endpoint.createExecutor()).thenReturn(es);
Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
Mockito.when(endpoint.connect(any(ExecutorService.class))).thenReturn(conn);
Mockito.when(conn.createChannel()).thenReturn(channel);
Mockito.when(channel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))).thenReturn("TAG");
Mockito.when(channel.isOpen()).thenReturn(false);
+ Mockito.when(endpoint.getCamelContext()).thenReturn(ecc);
+ Mockito.when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+ Mockito.when(ecc.getExchangeFactory()).thenReturn(ef);
+ Mockito.when(ef.newExchangeFactory(any())).thenReturn(ef);
+ Mockito.when(ecc.getExecutorServiceManager()).thenReturn(esm);
+ Mockito.when(esm.shutdownNow(es)).then(i -> es.shutdownNow());
+
Mockito.doThrow(alreadyClosedException).when(channel).basicCancel("TAG");
Mockito.doThrow(alreadyClosedException).when(channel).close();
+ RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
consumer.doStart();
consumer.doStop();
[camel] 01/03: CAMEL-16222: PooledExchangeFactory experiment
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 53930f287e0a2491baa5569a1882169bd805d318
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Feb 23 07:04:14 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../mail/MailConsumerAuthenticatorTest.java | 9 +++++++
.../apache/camel/pgevent/PgEventConsumerTest.java | 30 +++++++++++++++++++---
2 files changed, 35 insertions(+), 4 deletions(-)
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailConsumerAuthenticatorTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailConsumerAuthenticatorTest.java
index e4ea961..c4f18a7 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailConsumerAuthenticatorTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailConsumerAuthenticatorTest.java
@@ -23,12 +23,15 @@ import javax.mail.MessagingException;
import javax.mail.PasswordAuthentication;
import javax.mail.Session;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
+import org.apache.camel.spi.ExchangeFactory;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
/**
@@ -51,8 +54,11 @@ public class MailConsumerAuthenticatorTest {
JavaMailSender sender = Mockito.mock(JavaMailSender.class);
Processor processor = Mockito.mock(Processor.class);
+ ExtendedCamelContext ecc = Mockito.mock(ExtendedCamelContext.class);
+ ExchangeFactory ef = Mockito.mock(ExchangeFactory.class);
MailEndpoint endpoint = new MailEndpoint();
+ endpoint.setCamelContext(ecc);
MailConfiguration configuration = new MailConfiguration();
configuration.setAuthenticator(authenticator);
configuration.configureProtocol(protocol);
@@ -65,6 +71,9 @@ public class MailConsumerAuthenticatorTest {
Session session = Session.getDefaultInstance(props, authenticator);
when(sender.getSession()).thenReturn(session);
+ when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+ when(ecc.getExchangeFactory()).thenReturn(ef);
+ when(ef.newExchangeFactory(any())).thenReturn(ef);
MailConsumer consumer = new MailConsumer(endpoint, processor, sender);
try {
diff --git a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java
index 82ff432..196de5a 100644
--- a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java
+++ b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java
@@ -20,16 +20,19 @@ import java.sql.PreparedStatement;
import com.impossibl.postgres.api.jdbc.PGConnection;
import com.impossibl.postgres.jdbc.PGDataSource;
-import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ExtendedExchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.pgevent.PgEventConsumer;
import org.apache.camel.component.pgevent.PgEventEndpoint;
+import org.apache.camel.spi.ExchangeFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -44,7 +47,13 @@ public class PgEventConsumerTest {
PreparedStatement statement = mock(PreparedStatement.class);
PgEventEndpoint endpoint = mock(PgEventEndpoint.class);
Processor processor = mock(Processor.class);
+ ExtendedCamelContext ecc = mock(ExtendedCamelContext.class);
+ ExchangeFactory ef = mock(ExchangeFactory.class);
+ when(endpoint.getCamelContext()).thenReturn(ecc);
+ when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+ when(ecc.getExchangeFactory()).thenReturn(ef);
+ when(ef.newExchangeFactory(any())).thenReturn(ef);
when(endpoint.getDatasource()).thenReturn(dataSource);
when(dataSource.getConnection()).thenReturn(connection);
when(connection.prepareStatement("LISTEN camel")).thenReturn(statement);
@@ -64,7 +73,13 @@ public class PgEventConsumerTest {
PreparedStatement statement = mock(PreparedStatement.class);
PgEventEndpoint endpoint = mock(PgEventEndpoint.class);
Processor processor = mock(Processor.class);
+ ExtendedCamelContext ecc = mock(ExtendedCamelContext.class);
+ ExchangeFactory ef = mock(ExchangeFactory.class);
+ when(endpoint.getCamelContext()).thenReturn(ecc);
+ when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+ when(ecc.getExchangeFactory()).thenReturn(ef);
+ when(ef.newExchangeFactory(any())).thenReturn(ef);
when(endpoint.getDatasource()).thenReturn(dataSource);
when(dataSource.getConnection()).thenReturn(connection);
when(connection.prepareStatement("LISTEN camel")).thenReturn(statement);
@@ -84,10 +99,17 @@ public class PgEventConsumerTest {
public void testPgEventNotification() throws Exception {
PgEventEndpoint endpoint = mock(PgEventEndpoint.class);
Processor processor = mock(Processor.class);
- Exchange exchange = mock(Exchange.class);
+ ExtendedExchange exchange = mock(ExtendedExchange.class);
Message message = mock(Message.class);
-
- when(endpoint.createExchange()).thenReturn(exchange);
+ ExtendedCamelContext ecc = mock(ExtendedCamelContext.class);
+ ExchangeFactory ef = mock(ExchangeFactory.class);
+
+ when(endpoint.getCamelContext()).thenReturn(ecc);
+ when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+ when(ecc.getExchangeFactory()).thenReturn(ef);
+ when(ef.newExchangeFactory(any())).thenReturn(ef);
+ when(ef.create(endpoint, false)).thenReturn(exchange);
+ when(exchange.adapt(ExtendedExchange.class)).thenReturn(exchange);
when(exchange.getIn()).thenReturn(message);
PgEventConsumer consumer = new PgEventConsumer(endpoint, processor);
[camel] 03/03: CAMEL-16222: PooledExchangeFactory experiment
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6b0720575b89587cebdf9c8ea44c6ce4bd25a9ef
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Feb 23 07:16:47 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../org/apache/camel/component/kafka/KafkaConsumerTest.java | 13 +++++++++++++
1 file changed, 13 insertions(+)
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index eff7f14..26188c4 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -16,8 +16,11 @@
*/
package org.apache.camel.component.kafka;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
+import org.apache.camel.spi.ExchangeFactory;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
@@ -31,9 +34,15 @@ public class KafkaConsumerTest {
private KafkaComponent component = mock(KafkaComponent.class);
private KafkaEndpoint endpoint = mock(KafkaEndpoint.class);
private Processor processor = mock(Processor.class);
+ private ExtendedCamelContext ecc = mock(ExtendedCamelContext.class);
+ private ExchangeFactory ef = mock(ExchangeFactory.class);
@Test
public void consumerRequiresBootstrapServers() throws Exception {
+ when(endpoint.getCamelContext()).thenReturn(ecc);
+ when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+ when(ecc.getExchangeFactory()).thenReturn(ef);
+ when(ef.newExchangeFactory(any())).thenReturn(ef);
when(endpoint.getComponent()).thenReturn(component);
when(endpoint.getConfiguration()).thenReturn(configuration);
when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne");
@@ -45,6 +54,10 @@ public class KafkaConsumerTest {
@Test
public void consumerOnlyRequiresBootstrapServers() throws Exception {
+ when(endpoint.getCamelContext()).thenReturn(ecc);
+ when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+ when(ecc.getExchangeFactory()).thenReturn(ef);
+ when(ef.newExchangeFactory(any())).thenReturn(ef);
when(endpoint.getComponent()).thenReturn(component);
when(endpoint.getConfiguration()).thenReturn(configuration);
when(endpoint.getConfiguration().getBrokers()).thenReturn("localhost:2181");