You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/22 08:56:50 UTC

[camel] branch exchange-factory updated (401890f -> 966be0e)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 401890f  CAMEL-16222: PooledExchangeFactory experiment
     new ab56b04  CAMEL-16222: PooledExchangeFactory experiment
     new 966be0e  CAMEL-16222: PooledExchangeFactory experiment

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../component/paho/mqtt5/PahoMqtt5Consumer.java    |  14 +-
 .../component/paho/mqtt5/PahoMqtt5Endpoint.java    |  13 --
 .../apache/camel/component/paho/PahoConsumer.java  |  14 +-
 .../apache/camel/component/paho/PahoEndpoint.java  |  14 --
 .../slot/PgReplicationSlotConsumer.java            |   2 +-
 .../camel/component/pgevent/PgEventConsumer.java   |  10 +-
 .../http/vertx/VertxPlatformHttpConsumer.java      |   4 +-
 .../camel/component/pubnub/PubNubConsumer.java     |   9 +-
 .../component/quickfixj/QuickfixjEndpoint.java     |  13 +-
 .../quickfixj/converter/QuickfixjConverters.java   |  23 ++++
 .../component/quickfixj/QuickfixjConsumerTest.java | 147 ---------------------
 11 files changed, 71 insertions(+), 192 deletions(-)
 delete mode 100644 components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java


[camel] 01/02: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ab56b04631203cf7b7b7f058419e1c188b263ff1
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Feb 22 09:48:44 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../camel/component/paho/mqtt5/PahoMqtt5Consumer.java      | 14 +++++++++++++-
 .../camel/component/paho/mqtt5/PahoMqtt5Endpoint.java      | 13 -------------
 .../java/org/apache/camel/component/paho/PahoConsumer.java | 14 +++++++++++++-
 .../java/org/apache/camel/component/paho/PahoEndpoint.java | 14 --------------
 .../pg/replication/slot/PgReplicationSlotConsumer.java     |  2 +-
 .../apache/camel/component/pgevent/PgEventConsumer.java    | 10 +++++++---
 .../platform/http/vertx/VertxPlatformHttpConsumer.java     |  4 +---
 .../org/apache/camel/component/pubnub/PubNubConsumer.java  |  9 ++++-----
 8 files changed, 39 insertions(+), 41 deletions(-)

diff --git a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
index 57260b0..af8345a 100644
--- a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
+++ b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
@@ -103,7 +103,7 @@ public class PahoMqtt5Consumer extends DefaultConsumer {
             @Override
             public void messageArrived(String topic, MqttMessage message) throws Exception {
                 LOG.debug("Message arrived on topic: {} -> {}", topic, message);
-                Exchange exchange = getEndpoint().createExchange(message, topic);
+                Exchange exchange = createExchange(message, topic);
 
                 getAsyncProcessor().process(exchange, doneSync -> {
                     // noop
@@ -144,4 +144,16 @@ public class PahoMqtt5Consumer extends DefaultConsumer {
         return (PahoMqtt5Endpoint) super.getEndpoint();
     }
 
+    public Exchange createExchange(MqttMessage mqttMessage, String topic) {
+        Exchange exchange = createExchange(true);
+
+        PahoMqtt5Message paho = new PahoMqtt5Message(exchange.getContext(), mqttMessage);
+        paho.setBody(mqttMessage.getPayload());
+        paho.setHeader(PahoMqtt5Constants.MQTT_TOPIC, topic);
+        paho.setHeader(PahoMqtt5Constants.MQTT_QOS, mqttMessage.getQos());
+
+        exchange.setIn(paho);
+        return exchange;
+    }
+
 }
diff --git a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Endpoint.java b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Endpoint.java
index f558ac1..725df0d 100644
--- a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Endpoint.java
+++ b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Endpoint.java
@@ -20,7 +20,6 @@ import java.util.UUID;
 
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.Metadata;
@@ -83,18 +82,6 @@ public class PahoMqtt5Endpoint extends DefaultEndpoint {
         return topic;
     }
 
-    public Exchange createExchange(MqttMessage mqttMessage, String topic) {
-        Exchange exchange = createExchange();
-
-        PahoMqtt5Message paho = new PahoMqtt5Message(exchange.getContext(), mqttMessage);
-        paho.setBody(mqttMessage.getPayload());
-        paho.setHeader(PahoMqtt5Constants.MQTT_TOPIC, topic);
-        paho.setHeader(PahoMqtt5Constants.MQTT_QOS, mqttMessage.getQos());
-
-        exchange.setIn(paho);
-        return exchange;
-    }
-
     protected MqttConnectionOptions createMqttConnectionOptions() {
         PahoMqtt5Configuration config = getConfiguration();
         MqttConnectionOptions options = new MqttConnectionOptions();
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 4e42547..f908efd 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -92,7 +92,7 @@ public class PahoConsumer extends DefaultConsumer {
             @Override
             public void messageArrived(String topic, MqttMessage message) throws Exception {
                 LOG.debug("Message arrived on topic: {} -> {}", topic, message);
-                Exchange exchange = getEndpoint().createExchange(message, topic);
+                Exchange exchange = createExchange(message, topic);
 
                 getAsyncProcessor().process(exchange, new AsyncCallback() {
                     @Override
@@ -136,4 +136,16 @@ public class PahoConsumer extends DefaultConsumer {
         return (PahoEndpoint) super.getEndpoint();
     }
 
+    public Exchange createExchange(MqttMessage mqttMessage, String topic) {
+        Exchange exchange = createExchange(true);
+
+        PahoMessage paho = new PahoMessage(exchange.getContext(), mqttMessage);
+        paho.setBody(mqttMessage.getPayload());
+        paho.setHeader(PahoConstants.MQTT_TOPIC, topic);
+        paho.setHeader(PahoConstants.MQTT_QOS, mqttMessage.getQos());
+
+        exchange.setIn(paho);
+        return exchange;
+    }
+
 }
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
index 920cc28..d885204 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.paho;
 
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.Metadata;
@@ -30,7 +29,6 @@ import org.apache.camel.util.ObjectHelper;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
 
@@ -80,18 +78,6 @@ public class PahoEndpoint extends DefaultEndpoint {
         return topic;
     }
 
-    public Exchange createExchange(MqttMessage mqttMessage, String topic) {
-        Exchange exchange = createExchange();
-
-        PahoMessage paho = new PahoMessage(exchange.getContext(), mqttMessage);
-        paho.setBody(mqttMessage.getPayload());
-        paho.setHeader(PahoConstants.MQTT_TOPIC, topic);
-        paho.setHeader(PahoConstants.MQTT_QOS, mqttMessage.getQos());
-
-        exchange.setIn(paho);
-        return exchange;
-    }
-
     protected static MqttConnectOptions createMqttConnectOptions(PahoConfiguration config) {
         MqttConnectOptions mq = new MqttConnectOptions();
         if (ObjectHelper.isNotEmpty(config.getUserName()) && ObjectHelper.isNotEmpty(config.getPassword())) {
diff --git a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
index 9b2dcc3..d911431 100644
--- a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
+++ b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
@@ -125,7 +125,7 @@ public class PgReplicationSlotConsumer extends ScheduledPollConsumer {
             throw e;
         }
 
-        Exchange exchange = this.endpoint.createExchange();
+        Exchange exchange = createExchange(true);
         exchange.setExchangeId(stream.getLastReceiveLSN().asString());
 
         Message message = exchange.getIn();
diff --git a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
index 0c248a4..aebe8e3 100644
--- a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
+++ b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
@@ -59,18 +59,22 @@ public class PgEventConsumer extends DefaultConsumer implements PGNotificationLi
             LOG.debug("Notification processId: {}, channel: {}, payload: {}", processId, channel, payload);
         }
 
-        Exchange exchange = endpoint.createExchange();
+        Exchange exchange = createExchange(false);
         Message msg = exchange.getIn();
         msg.setHeader("channel", channel);
         msg.setBody(payload);
 
         try {
             getProcessor().process(exchange);
-        } catch (Exception ex) {
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+        if (exchange.getException() != null) {
             String cause = "Unable to process incoming notification from PostgreSQL: processId='" + processId + "', channel='"
                            + channel + "', payload='" + payload + "'";
-            getExceptionHandler().handleException(cause, ex);
+            getExceptionHandler().handleException(cause, exchange.getException());
         }
+        releaseExchange(exchange, false);
     }
 
     @Override
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
index 8937de5..a65d4d4 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
@@ -64,11 +64,9 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
     private final String fileNameExtWhitelist;
     private Set<Method> methods;
     private String path;
-
     private Route route;
 
-    public VertxPlatformHttpConsumer(
-                                     PlatformHttpEndpoint endpoint,
+    public VertxPlatformHttpConsumer(PlatformHttpEndpoint endpoint,
                                      Processor processor,
                                      List<Handler<RoutingContext>> handlers) {
         super(endpoint, processor);
diff --git a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java
index 243aeb0..ad3e200 100644
--- a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java
+++ b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java
@@ -103,7 +103,7 @@ public class PubNubConsumer extends DefaultConsumer {
 
         @Override
         public void message(PubNub pubnub, PNMessageResult message) {
-            Exchange exchange = endpoint.createExchange();
+            Exchange exchange = createExchange(true);
             Message inmessage = exchange.getIn();
             inmessage.setBody(message);
             inmessage.setHeader(TIMETOKEN, message.getTimetoken());
@@ -111,13 +111,13 @@ public class PubNubConsumer extends DefaultConsumer {
             try {
                 getProcessor().process(exchange);
             } catch (Exception e) {
-                getExceptionHandler().handleException("Error processing exchange", exchange, e);
+                getExceptionHandler().handleException("Error processing exchange", e);
             }
         }
 
         @Override
         public void presence(PubNub pubnub, PNPresenceEventResult presence) {
-            Exchange exchange = endpoint.createExchange();
+            Exchange exchange = createExchange(true);
             Message inmessage = exchange.getIn();
             inmessage.setBody(presence);
             inmessage.setHeader(TIMETOKEN, presence.getTimetoken());
@@ -125,8 +125,7 @@ public class PubNubConsumer extends DefaultConsumer {
             try {
                 getProcessor().process(exchange);
             } catch (Exception e) {
-                exchange.setException(e);
-                getExceptionHandler().handleException("Error processing exchange", exchange, e);
+                getExceptionHandler().handleException("Error processing exchange", e);
             }
         }
 


[camel] 02/02: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 966be0eebd95c97599e3b05e8ad309c5e4217eeb
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Feb 22 09:56:22 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../component/quickfixj/QuickfixjEndpoint.java     |  13 +-
 .../quickfixj/converter/QuickfixjConverters.java   |  23 ++++
 .../component/quickfixj/QuickfixjConsumerTest.java | 147 ---------------------
 3 files changed, 32 insertions(+), 151 deletions(-)

diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
index 7725b0d..0f45d55 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
@@ -133,10 +133,15 @@ public class QuickfixjEndpoint extends DefaultEndpoint implements QuickfixjEvent
         if (this.sessionID == null || isMatching(sessionID)) {
             for (QuickfixjConsumer consumer : consumers) {
                 Exchange exchange
-                        = QuickfixjConverters.toExchange(this, sessionID, message, eventCategory, getExchangePattern());
-                consumer.onExchange(exchange);
-                if (exchange.getException() != null) {
-                    throw exchange.getException();
+                        = QuickfixjConverters.toExchange(consumer, sessionID, message, eventCategory, getExchangePattern());
+                try {
+                    consumer.onExchange(exchange);
+                    Exception cause = exchange.getException();
+                    if (cause != null) {
+                        throw cause;
+                    }
+                } finally {
+                    consumer.releaseExchange(exchange, false);
                 }
             }
         }
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java
index 426efb4..65304ee 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java
@@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 
+import org.apache.camel.Consumer;
 import org.apache.camel.Converter;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -143,4 +144,26 @@ public final class QuickfixjConverters {
         return exchange;
     }
 
+    public static Exchange toExchange(
+            Consumer consumer, SessionID sessionID, Message message, QuickfixjEventCategory eventCategory,
+            ExchangePattern exchangePattern) {
+        Exchange exchange = consumer.createExchange(false);
+        exchange.setPattern(exchangePattern);
+
+        org.apache.camel.Message camelMessage = exchange.getIn();
+        camelMessage.setHeader(EVENT_CATEGORY_KEY, eventCategory);
+        camelMessage.setHeader(SESSION_ID_KEY, sessionID);
+
+        if (message != null) {
+            try {
+                camelMessage.setHeader(MESSAGE_TYPE_KEY, message.getHeader().getString(MsgType.FIELD));
+            } catch (FieldNotFound e) {
+                LOG.warn("Message type field not found in QFJ message: {}, continuing...", message);
+            }
+        }
+        camelMessage.setBody(message);
+
+        return exchange;
+    }
+
 }
diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
deleted file mode 100644
index 4e9360a..0000000
--- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.quickfixj;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
-import org.hamcrest.CoreMatchers;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-import quickfix.FixVersions;
-import quickfix.Message;
-import quickfix.Session;
-import quickfix.SessionID;
-import quickfix.field.BeginString;
-import quickfix.field.SenderCompID;
-import quickfix.field.TargetCompID;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.ArgumentMatchers.isA;
-
-public class QuickfixjConsumerTest {
-    private Exchange mockExchange;
-    private Processor mockProcessor;
-    private QuickfixjEndpoint mockEndpoint;
-    private Message inboundFixMessage;
-
-    @BeforeEach
-    public void setUp() {
-
-        mockExchange = Mockito.mock(Exchange.class);
-        org.apache.camel.Message mockCamelMessage = Mockito.mock(org.apache.camel.Message.class);
-        Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage);
-
-        inboundFixMessage = new Message();
-        inboundFixMessage.getHeader().setString(BeginString.FIELD, FixVersions.BEGINSTRING_FIX44);
-        inboundFixMessage.getHeader().setString(SenderCompID.FIELD, "SENDER");
-        inboundFixMessage.getHeader().setString(TargetCompID.FIELD, "TARGET");
-        Mockito.when(mockCamelMessage.getBody(quickfix.Message.class)).thenReturn(inboundFixMessage);
-
-        mockProcessor = Mockito.mock(Processor.class);
-        mockEndpoint = Mockito.mock(QuickfixjEndpoint.class);
-        Mockito.when(mockEndpoint.createExchange(ExchangePattern.InOnly)).thenReturn(mockExchange);
-    }
-
-    @Test
-    public void processExchangeOnlyWhenStarted() throws Exception {
-        QuickfixjConsumer consumer = new QuickfixjConsumer(mockEndpoint, mockProcessor);
-
-        assertThat("Consumer should not be automatically started",
-                consumer.isStarted(), CoreMatchers.is(false));
-
-        consumer.onExchange(mockExchange);
-
-        // No expected interaction with processor since component is not started
-        Mockito.verifyNoInteractions(mockProcessor);
-
-        consumer.start();
-        Mockito.verify(mockEndpoint).ensureInitialized();
-        assertThat(consumer.isStarted(), CoreMatchers.is(true));
-
-        consumer.onExchange(mockExchange);
-
-        // Second message should be processed
-        Mockito.verify(mockProcessor).process(isA(Exchange.class));
-    }
-
-    @Test
-    public void setExceptionOnExchange() throws Exception {
-        QuickfixjConsumer consumer = new QuickfixjConsumer(mockEndpoint, mockProcessor);
-        consumer.start();
-
-        Throwable exception = new Exception("Throwable for test");
-        Mockito.doThrow(exception).when(mockProcessor).process(mockExchange);
-
-        // Simulate a message from the FIX engine
-        consumer.onExchange(mockExchange);
-
-        Mockito.verify(mockExchange).setException(exception);
-    }
-
-    @Test
-    public void setExceptionOnInOutExchange() throws Exception {
-        org.apache.camel.Message mockCamelOutMessage = Mockito.mock(org.apache.camel.Message.class);
-        org.apache.camel.Message mockCamelInMessage = Mockito.mock(org.apache.camel.Message.class);
-        SessionID mockSessionId = Mockito.mock(SessionID.class);
-
-        QuickfixjConsumer consumer = Mockito.spy(new QuickfixjConsumer(mockEndpoint, mockProcessor));
-        Mockito.doReturn(null).when(consumer).getSession(mockSessionId);
-
-        Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
-        Mockito.when(mockExchange.hasOut()).thenReturn(true);
-        Mockito.when(mockExchange.getMessage()).thenReturn(mockCamelOutMessage);
-        Message outboundFixMessage = new Message();
-        Mockito.when(mockCamelOutMessage.getBody(Message.class)).thenReturn(outboundFixMessage);
-        Mockito.when(mockExchange.getIn()).thenReturn(mockCamelInMessage);
-        Mockito.when(mockCamelInMessage.getHeader("SessionID", SessionID.class)).thenReturn(mockSessionId);
-
-        consumer.start();
-
-        // Simulate a message from the FIX engine
-        consumer.onExchange(mockExchange);
-
-        Mockito.verify(mockExchange).setException(isA(IllegalStateException.class));
-    }
-
-    @Test
-    public void processInOutExchange() throws Exception {
-        org.apache.camel.Message mockCamelOutMessage = Mockito.mock(org.apache.camel.Message.class);
-        org.apache.camel.Message mockCamelInMessage = Mockito.mock(org.apache.camel.Message.class);
-        SessionID mockSessionId = Mockito.mock(SessionID.class);
-        Session mockSession = Mockito.mock(Session.class);
-
-        QuickfixjConsumer consumer = Mockito.spy(new QuickfixjConsumer(mockEndpoint, mockProcessor));
-        Mockito.doReturn(mockSession).when(consumer).getSession(mockSessionId);
-        Mockito.doReturn(true).when(mockSession).send(isA(Message.class));
-
-        Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
-        Mockito.when(mockExchange.hasOut()).thenReturn(true);
-        Mockito.when(mockExchange.getMessage()).thenReturn(mockCamelOutMessage);
-        Message outboundFixMessage = new Message();
-        Mockito.when(mockCamelOutMessage.getBody(Message.class)).thenReturn(outboundFixMessage);
-        Mockito.when(mockExchange.getIn()).thenReturn(mockCamelInMessage);
-        Mockito.when(mockCamelInMessage.getHeader("SessionID", SessionID.class)).thenReturn(mockSessionId);
-
-        consumer.start();
-
-        consumer.onExchange(mockExchange);
-        Mockito.verify(mockExchange, Mockito.never()).setException(isA(Exception.class));
-        Mockito.verify(mockSession).send(outboundFixMessage);
-    }
-}