You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/23 06:17:17 UTC

[camel] branch exchange-factory updated (70c02cf -> 6b07205)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 70c02cf  CAMEL-16222: PooledExchangeFactory experiment
     new 53930f2  CAMEL-16222: PooledExchangeFactory experiment
     new 0516ef9  CAMEL-16222: PooledExchangeFactory experiment
     new 6b07205  CAMEL-16222: PooledExchangeFactory experiment

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camel/component/kafka/KafkaConsumerTest.java   | 13 +++++++
 .../mail/MailConsumerAuthenticatorTest.java        |  9 +++++
 .../apache/camel/pgevent/PgEventConsumerTest.java  | 30 +++++++++++++--
 .../component/rabbitmq/RabbitMQConsumerTest.java   | 45 ++++++++++++++++++----
 4 files changed, 86 insertions(+), 11 deletions(-)


[camel] 02/03: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0516ef9c413bc42085d3ee489b27272d26dbf166
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Feb 23 07:14:14 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../component/rabbitmq/RabbitMQConsumerTest.java   | 45 ++++++++++++++++++----
 1 file changed, 38 insertions(+), 7 deletions(-)

diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
index a281bc3..79d2e11 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
@@ -24,33 +24,50 @@ import com.rabbitmq.client.AlreadyClosedException;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Consumer;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
+import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExecutorServiceManager;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
 
 public class RabbitMQConsumerTest {
 
+    private ExtendedCamelContext ecc = Mockito.mock(ExtendedCamelContext.class);
+    private ExchangeFactory ef = Mockito.mock(ExchangeFactory.class);
     private RabbitMQEndpoint endpoint = Mockito.mock(RabbitMQEndpoint.class);
     private Connection conn = Mockito.mock(Connection.class);
     private Processor processor = Mockito.mock(Processor.class);
     private Channel channel = Mockito.mock(Channel.class);
+    private ExecutorServiceManager esm = Mockito.mock(ExecutorServiceManager.class);
 
     @Test
     public void testStoppingConsumerShutdownExecutor() throws Exception {
-        RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
-
         ThreadPoolExecutor e = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
+
         Mockito.when(endpoint.createExecutor()).thenReturn(e);
         Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
         Mockito.when(endpoint.connect(any(ExecutorService.class))).thenReturn(conn);
+        Mockito.when(endpoint.getCamelContext()).thenReturn(ecc);
+        Mockito.when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+        Mockito.when(ecc.getExchangeFactory()).thenReturn(ef);
+        Mockito.when(ef.newExchangeFactory(any())).thenReturn(ef);
+        Mockito.when(ecc.getExecutorServiceManager()).thenReturn(esm);
+        Mockito.when(esm.shutdownNow(e)).then(i -> e.shutdownNow());
         Mockito.when(conn.createChannel()).thenReturn(channel);
 
+        RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
+
         consumer.doStart();
         assertFalse(e.isShutdown());
 
@@ -60,13 +77,20 @@ public class RabbitMQConsumerTest {
 
     @Test
     public void testStoppingConsumerShutdownConnection() throws Exception {
-        RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
+        ExecutorService es = Executors.newFixedThreadPool(3);
 
-        Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3));
+        Mockito.when(endpoint.createExecutor()).thenReturn(es);
         Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
         Mockito.when(endpoint.connect(any(ExecutorService.class))).thenReturn(conn);
         Mockito.when(conn.createChannel()).thenReturn(channel);
+        Mockito.when(endpoint.getCamelContext()).thenReturn(ecc);
+        Mockito.when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+        Mockito.when(ecc.getExchangeFactory()).thenReturn(ef);
+        Mockito.when(ef.newExchangeFactory(any())).thenReturn(ef);
+        Mockito.when(ecc.getExecutorServiceManager()).thenReturn(esm);
+        Mockito.when(esm.shutdownNow(es)).then(i -> es.shutdownNow());
 
+        RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
         consumer.doStart();
         consumer.doStop();
 
@@ -76,18 +100,25 @@ public class RabbitMQConsumerTest {
     @Test
     public void testStoppingConsumerShutdownConnectionWhenServerHasClosedChannel() throws Exception {
         AlreadyClosedException alreadyClosedException = Mockito.mock(AlreadyClosedException.class);
+        ExecutorService es = Executors.newFixedThreadPool(3);
 
-        RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
-
-        Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3));
+        Mockito.when(endpoint.createExecutor()).thenReturn(es);
         Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
         Mockito.when(endpoint.connect(any(ExecutorService.class))).thenReturn(conn);
         Mockito.when(conn.createChannel()).thenReturn(channel);
         Mockito.when(channel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))).thenReturn("TAG");
         Mockito.when(channel.isOpen()).thenReturn(false);
+        Mockito.when(endpoint.getCamelContext()).thenReturn(ecc);
+        Mockito.when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+        Mockito.when(ecc.getExchangeFactory()).thenReturn(ef);
+        Mockito.when(ef.newExchangeFactory(any())).thenReturn(ef);
+        Mockito.when(ecc.getExecutorServiceManager()).thenReturn(esm);
+        Mockito.when(esm.shutdownNow(es)).then(i -> es.shutdownNow());
+
         Mockito.doThrow(alreadyClosedException).when(channel).basicCancel("TAG");
         Mockito.doThrow(alreadyClosedException).when(channel).close();
 
+        RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
         consumer.doStart();
         consumer.doStop();
 


[camel] 01/03: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 53930f287e0a2491baa5569a1882169bd805d318
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Feb 23 07:04:14 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../mail/MailConsumerAuthenticatorTest.java        |  9 +++++++
 .../apache/camel/pgevent/PgEventConsumerTest.java  | 30 +++++++++++++++++++---
 2 files changed, 35 insertions(+), 4 deletions(-)

diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailConsumerAuthenticatorTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailConsumerAuthenticatorTest.java
index e4ea961..c4f18a7 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailConsumerAuthenticatorTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailConsumerAuthenticatorTest.java
@@ -23,12 +23,15 @@ import javax.mail.MessagingException;
 import javax.mail.PasswordAuthentication;
 import javax.mail.Session;
 
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
+import org.apache.camel.spi.ExchangeFactory;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
 
 /**
@@ -51,8 +54,11 @@ public class MailConsumerAuthenticatorTest {
 
         JavaMailSender sender = Mockito.mock(JavaMailSender.class);
         Processor processor = Mockito.mock(Processor.class);
+        ExtendedCamelContext ecc = Mockito.mock(ExtendedCamelContext.class);
+        ExchangeFactory ef = Mockito.mock(ExchangeFactory.class);
 
         MailEndpoint endpoint = new MailEndpoint();
+        endpoint.setCamelContext(ecc);
         MailConfiguration configuration = new MailConfiguration();
         configuration.setAuthenticator(authenticator);
         configuration.configureProtocol(protocol);
@@ -65,6 +71,9 @@ public class MailConsumerAuthenticatorTest {
         Session session = Session.getDefaultInstance(props, authenticator);
 
         when(sender.getSession()).thenReturn(session);
+        when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+        when(ecc.getExchangeFactory()).thenReturn(ef);
+        when(ef.newExchangeFactory(any())).thenReturn(ef);
 
         MailConsumer consumer = new MailConsumer(endpoint, processor, sender);
         try {
diff --git a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java
index 82ff432..196de5a 100644
--- a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java
+++ b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java
@@ -20,16 +20,19 @@ import java.sql.PreparedStatement;
 
 import com.impossibl.postgres.api.jdbc.PGConnection;
 import com.impossibl.postgres.jdbc.PGDataSource;
-import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.pgevent.PgEventConsumer;
 import org.apache.camel.component.pgevent.PgEventEndpoint;
+import org.apache.camel.spi.ExchangeFactory;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -44,7 +47,13 @@ public class PgEventConsumerTest {
         PreparedStatement statement = mock(PreparedStatement.class);
         PgEventEndpoint endpoint = mock(PgEventEndpoint.class);
         Processor processor = mock(Processor.class);
+        ExtendedCamelContext ecc = mock(ExtendedCamelContext.class);
+        ExchangeFactory ef = mock(ExchangeFactory.class);
 
+        when(endpoint.getCamelContext()).thenReturn(ecc);
+        when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+        when(ecc.getExchangeFactory()).thenReturn(ef);
+        when(ef.newExchangeFactory(any())).thenReturn(ef);
         when(endpoint.getDatasource()).thenReturn(dataSource);
         when(dataSource.getConnection()).thenReturn(connection);
         when(connection.prepareStatement("LISTEN camel")).thenReturn(statement);
@@ -64,7 +73,13 @@ public class PgEventConsumerTest {
         PreparedStatement statement = mock(PreparedStatement.class);
         PgEventEndpoint endpoint = mock(PgEventEndpoint.class);
         Processor processor = mock(Processor.class);
+        ExtendedCamelContext ecc = mock(ExtendedCamelContext.class);
+        ExchangeFactory ef = mock(ExchangeFactory.class);
 
+        when(endpoint.getCamelContext()).thenReturn(ecc);
+        when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+        when(ecc.getExchangeFactory()).thenReturn(ef);
+        when(ef.newExchangeFactory(any())).thenReturn(ef);
         when(endpoint.getDatasource()).thenReturn(dataSource);
         when(dataSource.getConnection()).thenReturn(connection);
         when(connection.prepareStatement("LISTEN camel")).thenReturn(statement);
@@ -84,10 +99,17 @@ public class PgEventConsumerTest {
     public void testPgEventNotification() throws Exception {
         PgEventEndpoint endpoint = mock(PgEventEndpoint.class);
         Processor processor = mock(Processor.class);
-        Exchange exchange = mock(Exchange.class);
+        ExtendedExchange exchange = mock(ExtendedExchange.class);
         Message message = mock(Message.class);
-
-        when(endpoint.createExchange()).thenReturn(exchange);
+        ExtendedCamelContext ecc = mock(ExtendedCamelContext.class);
+        ExchangeFactory ef = mock(ExchangeFactory.class);
+
+        when(endpoint.getCamelContext()).thenReturn(ecc);
+        when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+        when(ecc.getExchangeFactory()).thenReturn(ef);
+        when(ef.newExchangeFactory(any())).thenReturn(ef);
+        when(ef.create(endpoint, false)).thenReturn(exchange);
+        when(exchange.adapt(ExtendedExchange.class)).thenReturn(exchange);
         when(exchange.getIn()).thenReturn(message);
 
         PgEventConsumer consumer = new PgEventConsumer(endpoint, processor);


[camel] 03/03: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6b0720575b89587cebdf9c8ea44c6ce4bd25a9ef
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Feb 23 07:16:47 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../org/apache/camel/component/kafka/KafkaConsumerTest.java | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index eff7f14..26188c4 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -16,8 +16,11 @@
  */
 package org.apache.camel.component.kafka;
 
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
+import org.apache.camel.spi.ExchangeFactory;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
@@ -31,9 +34,15 @@ public class KafkaConsumerTest {
     private KafkaComponent component = mock(KafkaComponent.class);
     private KafkaEndpoint endpoint = mock(KafkaEndpoint.class);
     private Processor processor = mock(Processor.class);
+    private ExtendedCamelContext ecc = mock(ExtendedCamelContext.class);
+    private ExchangeFactory ef = mock(ExchangeFactory.class);
 
     @Test
     public void consumerRequiresBootstrapServers() throws Exception {
+        when(endpoint.getCamelContext()).thenReturn(ecc);
+        when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+        when(ecc.getExchangeFactory()).thenReturn(ef);
+        when(ef.newExchangeFactory(any())).thenReturn(ef);
         when(endpoint.getComponent()).thenReturn(component);
         when(endpoint.getConfiguration()).thenReturn(configuration);
         when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne");
@@ -45,6 +54,10 @@ public class KafkaConsumerTest {
 
     @Test
     public void consumerOnlyRequiresBootstrapServers() throws Exception {
+        when(endpoint.getCamelContext()).thenReturn(ecc);
+        when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc);
+        when(ecc.getExchangeFactory()).thenReturn(ef);
+        when(ef.newExchangeFactory(any())).thenReturn(ef);
         when(endpoint.getComponent()).thenReturn(component);
         when(endpoint.getConfiguration()).thenReturn(configuration);
         when(endpoint.getConfiguration().getBrokers()).thenReturn("localhost:2181");