You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by qu...@apache.org on 2018/03/23 15:56:10 UTC

[camel] branch master updated (9dbf5eb -> 97ae610)

This is an automated email from the ASF dual-hosted git repository.

quinn pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 9dbf5eb  CAMEL-12334: change defaults and add support fo...
     new 14d2cf9  CAMEL-12325: Correct idleTimeout behaviour for the MllpTcpClientProducer
     new 97ae610  CAMEL-12325: logging cleanup

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/component/mllp/MllpEndpoint.java  |   8 ++
 .../component/mllp/MllpTcpClientProducer.java      | 160 +++++++++++++--------
 .../component/mllp/MllpTcpServerConsumer.java      |  36 +++--
 .../component/mllp/internal/MllpSocketBuffer.java  |  87 +++++------
 .../MllpTcpClientProducerConnectionErrorTest.java  |  53 ++++---
 .../mllp/internal/MllpSocketBufferTest.java        |  16 +++
 .../mllp/internal/MllpSocketBufferWriteTest.java   |  12 +-
 7 files changed, 230 insertions(+), 142 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
quinn@apache.org.

[camel] 02/02: CAMEL-12325: logging cleanup

Posted by qu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

quinn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 97ae6103acd5c28626d14045502e42df4e0b91af
Author: Quinn Stevenson <qu...@apache.org>
AuthorDate: Fri Mar 23 07:56:07 2018 -0600

    CAMEL-12325: logging cleanup
---
 .../apache/camel/component/mllp/MllpEndpoint.java  |   3 +-
 .../component/mllp/MllpTcpClientProducer.java      | 123 +++++++++++----------
 .../component/mllp/MllpTcpServerConsumer.java      |  36 +++---
 .../component/mllp/internal/MllpSocketBuffer.java  |  87 ++++++++-------
 .../mllp/internal/MllpSocketBufferTest.java        |  16 +++
 .../mllp/internal/MllpSocketBufferWriteTest.java   |  12 +-
 6 files changed, 154 insertions(+), 123 deletions(-)

diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index dbadfa8..c05b8b6 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -53,8 +53,7 @@ import org.slf4j.LoggerFactory;
  * <p/>
  */
 @ManagedResource(description = "MLLP Endpoint")
-// @UriEndpoint(scheme = "mllp", firstVersion = "2.17.0", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp")
-@UriEndpoint(scheme = "mllp", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp")
+@UriEndpoint(scheme = "mllp", firstVersion = "2.17.0", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp")
 public class MllpEndpoint extends DefaultEndpoint {
     // Use constants from MllpProtocolConstants
     @Deprecated()
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
index df5705a..57bd1d3 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
@@ -18,30 +18,33 @@
 package org.apache.camel.component.mllp;
 
 import java.io.IOException;
+
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
-import java.nio.charset.Charset;
+
 import java.util.Date;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
+
 import org.apache.camel.component.mllp.internal.Hl7Util;
 import org.apache.camel.component.mllp.internal.MllpSocketBuffer;
+
 import org.apache.camel.impl.DefaultProducer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 
 
 /**
@@ -49,12 +52,12 @@ import org.slf4j.MDC;
  */
 @ManagedResource(description = "MLLP Producer")
 public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
-    Socket socket;
-
+    final Logger log;
     final MllpSocketBuffer mllpBuffer;
 
+    Socket socket;
+
     ScheduledExecutorService idleTimeoutExecutor;
-    // long lastProcessCallTicks = -1;
 
     private String cachedLocalAddress;
     private String cachedRemoteAddress;
@@ -62,6 +65,9 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
 
     public MllpTcpClientProducer(MllpEndpoint endpoint) throws SocketException {
         super(endpoint);
+
+        log = LoggerFactory.getLogger(String.format("%s.%s.%d", this.getClass().getName(), endpoint.getHostname(), endpoint.getPort()));
+
         log.trace("Constructing MllpTcpClientProducer for endpoint URI {}", endpoint.getEndpointUri());
 
         mllpBuffer = new MllpSocketBuffer(endpoint);
@@ -127,7 +133,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
 
     @Override
     public synchronized void process(Exchange exchange) throws MllpException {
-        log.trace("Processing Exchange {} for {}", exchange.getExchangeId(), socket);
+        log.trace("process({}) [{}] - entering", exchange.getExchangeId(), socket);
         getEndpoint().updateLastConnectionActivityTicks();
 
         Message message = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
@@ -150,7 +156,8 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
             byte[] hl7MessageBytes = null;
             Object messageBody = message.getBody();
             if (messageBody == null) {
-                exchange.setException(new MllpInvalidMessageException("message body is null", hl7MessageBytes));
+                String exceptionMessage = String.format("process(%s) [%s] - message body is null", exchange.getExchangeId(), socket);
+                exchange.setException(new MllpInvalidMessageException(exceptionMessage, hl7MessageBytes));
                 return;
             } else if (messageBody instanceof byte[]) {
                 hl7MessageBytes = (byte[]) messageBody;
@@ -162,99 +169,94 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
                 }
             }
 
-            log.debug("Sending message to external system {}", socket);
+            log.debug("process({}) [{}] - sending message to external system", exchange.getExchangeId(), socket);
 
             try {
                 mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
                 mllpBuffer.writeTo(socket);
             } catch (MllpSocketException writeEx) {
                 // Connection may have been reset - try one more time
-                log.debug("Exception encountered writing payload to {} - attempting reconnect", writeEx, socket);
+                log.debug("process({}) [{}] - exception encountered writing payload - attempting reconnect", exchange.getExchangeId(), socket, writeEx);
                 try {
                     checkConnection();
-                    log.trace("Reconnected succeeded - resending payload to {}", socket);
+                    log.trace("process({}) [{}] - reconnected succeeded - resending payload", exchange.getExchangeId(), socket);
                     try {
                         mllpBuffer.writeTo(socket);
                     } catch (MllpSocketException retryWriteEx) {
-                        log.warn("Exception encountered attempting to write payload to {} after reconnect - sending original exception to exchange", socket, retryWriteEx);
-                        exchange.setException(new MllpWriteException("Exception encountered writing payload after reconnect", mllpBuffer.toByteArrayAndReset(), retryWriteEx));
+                        String exceptionMessage = String.format("process(%s) [%s] - exception encountered attempting to write payload after reconnect", exchange.getExchangeId(), socket);
+                        log.warn(exceptionMessage, retryWriteEx);
+                        exchange.setException(new MllpWriteException(exceptionMessage, mllpBuffer.toByteArrayAndReset(), retryWriteEx));
                     }
                 } catch (IOException reconnectEx) {
-                    log.warn("Exception encountered attempting to reconnect - sending original exception to exchange", reconnectEx);
-                    exchange.setException(new MllpWriteException("Exception encountered writing payload", mllpBuffer.toByteArrayAndReset(), writeEx));
+                    String exceptionMessage = String.format("process(%s) [%s] - exception encountered attempting to reconnect", exchange.getExchangeId(), socket);
+                    log.warn(exceptionMessage, reconnectEx);
+                    exchange.setException(new MllpWriteException(exceptionMessage, mllpBuffer.toByteArrayAndReset(), writeEx));
                     mllpBuffer.resetSocket(socket);
                 }
             }
 
             if (exchange.getException() == null) {
-                log.debug("Reading acknowledgement from external system {}", socket);
+                log.debug("process({}) [{}] - reading acknowledgement from external system", exchange.getExchangeId(), socket);
                 try {
                     mllpBuffer.reset();
                     mllpBuffer.readFrom(socket);
                 } catch (MllpSocketException receiveAckEx) {
                     // Connection may have been reset - try one more time
-                    log.debug("Exception encountered reading acknowledgement from {} - attempting reconnect", socket, receiveAckEx);
+                    log.debug("process({}) [{}] - exception encountered reading acknowledgement - attempting reconnect", exchange.getExchangeId(), socket, receiveAckEx);
                     try {
                         checkConnection();
                     } catch (IOException reconnectEx) {
-                        log.warn("Exception encountered attempting to reconnect after acknowledgement read failure - sending original acknowledgement exception to exchange", reconnectEx);
-                        exchange.setException(new MllpAcknowledgementReceiveException("Exception encountered reading acknowledgement", hl7MessageBytes, receiveAckEx));
+                        String exceptionMessage = String.format("process(%s) [%s] - exception encountered attempting to reconnect after acknowledgement read failure",
+                            exchange.getExchangeId(), socket);
+                        log.warn(exceptionMessage, reconnectEx);
+                        exchange.setException(new MllpAcknowledgementReceiveException(exceptionMessage, hl7MessageBytes, receiveAckEx));
                         mllpBuffer.resetSocket(socket);
                     }
 
                     if (exchange.getException() == null) {
-                        log.trace("Reconnected succeeded - resending payload to {}", socket);
+                        log.trace("process({}) [{}] - resending payload after successful reconnect", exchange.getExchangeId(), socket);
                         try {
                             mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
                             mllpBuffer.writeTo(socket);
                         } catch (MllpSocketException writeRetryEx) {
-                            log.warn("Exception encountered attempting to write payload to {} after read failure and successful reconnect - sending original exception to exchange",
-                                socket, writeRetryEx);
-                            exchange.setException(new MllpWriteException("Exception encountered writing payload after read failure and reconnect", hl7MessageBytes, receiveAckEx));
+                            String exceptionMessage = String.format("process(%s) [%s] - exception encountered attempting to write payload after read failure and successful reconnect",
+                                exchange.getExchangeId(), socket);
+                            log.warn(exceptionMessage, writeRetryEx);
+                            exchange.setException(new MllpWriteException(exceptionMessage, hl7MessageBytes, receiveAckEx));
                         }
 
                         if (exchange.getException() == null) {
-                            log.trace("Resend succeeded - reading acknowledgement from {}", socket);
+                            log.trace("process({}) [{}] - resend succeeded - reading acknowledgement from {}", exchange.getExchangeId(), socket);
                             try {
                                 mllpBuffer.reset();
                                 mllpBuffer.readFrom(socket);
                             } catch (MllpSocketException secondReceiveEx) {
-                                if (mllpBuffer.isEmpty()) {
-                                    log.warn("Exception encountered reading acknowledgement from {} after successful reconnect and resend", socket, secondReceiveEx);
-                                    Exception exchangeEx = new MllpAcknowledgementReceiveException("Exception encountered receiving Acknowledgement", hl7MessageBytes, receiveAckEx);
-                                    exchange.setException(exchangeEx);
-                                } else {
-                                    byte[] partialAcknowledgment = mllpBuffer.toByteArray();
-                                    mllpBuffer.reset();
-                                    log.warn("Exception encountered reading a complete acknowledgement from {} after successful reconnect and resend", socket, secondReceiveEx);
-                                    Exception exchangeEx = new MllpAcknowledgementReceiveException("Exception encountered receiving complete Acknowledgement",
-                                        hl7MessageBytes, partialAcknowledgment, receiveAckEx);
-                                    exchange.setException(exchangeEx);
-                                }
+                                String exceptionMessageFormat = mllpBuffer.isEmpty()
+                                    ? "process(%s) [%s] - exception encountered reading MLLP Acknowledgement after successful reconnect and resend"
+                                    : "process(%s) [%s] - exception encountered reading complete MLLP Acknowledgement after successful reconnect and resend";
+                                String exceptionMessage = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
+                                log.warn(exceptionMessage, secondReceiveEx);
+                                // Send the original exception to the exchange
+                                exchange.setException(new MllpAcknowledgementReceiveException(exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), receiveAckEx));
                             } catch (SocketTimeoutException secondReadTimeoutEx) {
-                                if (mllpBuffer.isEmpty()) {
-                                    log.warn("Timeout receiving HL7 Acknowledgment from {} after successful reconnect", socket, secondReadTimeoutEx);
-                                    exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving HL7 Acknowledgement after successful reconnect",
-                                        hl7MessageBytes, secondReadTimeoutEx));
-                                } else {
-                                    log.warn("Timeout receiving complete HL7 Acknowledgment from {} after successful reconnect", socket, secondReadTimeoutEx);
-                                    exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving complete HL7 Acknowledgement after successful reconnect",
-                                        hl7MessageBytes, mllpBuffer.toByteArray(), secondReadTimeoutEx));
-                                    mllpBuffer.reset();
-                                }
+                                String exceptionMessageFormat = mllpBuffer.isEmpty()
+                                    ? "process(%s) [%s] - timeout receiving MLLP Acknowledgment after successful reconnect and resend"
+                                    : "process(%s) [%s] - timeout receiving complete MLLP Acknowledgment after successful reconnect and resend";
+                                String exceptionMessage = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
+                                log.warn(exceptionMessage, secondReadTimeoutEx);
+                                // Send the original exception to the exchange
+                                exchange.setException(new MllpAcknowledgementTimeoutException(exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), receiveAckEx));
                                 mllpBuffer.resetSocket(socket);
                             }
                         }
                     }
                 } catch (SocketTimeoutException timeoutEx) {
-                    if (mllpBuffer.isEmpty()) {
-                        log.warn("Timeout receiving HL7 Acknowledgment from {}", socket, timeoutEx);
-                        exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving HL7 Acknowledgement", hl7MessageBytes, timeoutEx));
-                    } else {
-                        log.warn("Timeout receiving complete HL7 Acknowledgment from {}", socket, timeoutEx);
-                        exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving complete HL7 Acknowledgement", hl7MessageBytes, mllpBuffer.toByteArray(), timeoutEx));
-                        mllpBuffer.reset();
-                    }
+                    String exceptionMessageFormat = mllpBuffer.isEmpty()
+                        ? "process(%s) [%s] - timeout receiving MLLP Acknowledgment"
+                        : "process(%s) [%s] - timeout receiving complete MLLP Acknowledgment";
+                    String exceptionMessage = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
+                    log.warn(exceptionMessage, timeoutEx);
+                    exchange.setException(new MllpAcknowledgementTimeoutException(exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), timeoutEx));
                     mllpBuffer.resetSocket(socket);
                 }
 
@@ -262,7 +264,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
                     if (mllpBuffer.hasCompleteEnvelope()) {
                         byte[] acknowledgementBytes = mllpBuffer.toMllpPayload();
 
-                        log.debug("Populating message headers with the acknowledgement from the external system {}", socket);
+                        log.debug("process({}) [{}] - populating message headers with the acknowledgement from the external system", exchange.getExchangeId(), socket);
                         message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
                         if (acknowledgementBytes != null && acknowledgementBytes.length > 0) {
                             message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, new String(acknowledgementBytes, getConfiguration().getCharset(exchange, acknowledgementBytes)));
@@ -278,7 +280,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
                         }
 
                         if (exchange.getException() == null) {
-                            log.debug("Processing the acknowledgement from the external system {}", socket);
+                            log.debug("process({}) [{}] - processing the acknowledgement from the external system", exchange.getExchangeId(), socket);
                             try {
                                 message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, processAcknowledgment(hl7MessageBytes, acknowledgementBytes));
                             } catch (MllpNegativeAcknowledgementException nackEx) {
@@ -289,18 +291,21 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
                             getEndpoint().checkAfterSendProperties(exchange, socket, log);
                         }
                     } else {
-                        exchange.setException(new MllpInvalidAcknowledgementException("Invalid acknowledgement received", hl7MessageBytes, mllpBuffer.toByteArrayAndReset()));
+                        String exceptionMessage = String.format("process(%s) [%s] - invalid acknowledgement received", exchange.getExchangeId(), socket);
+                        exchange.setException(new MllpInvalidAcknowledgementException(exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset()));
                     }
                 }
             }
 
         } catch (IOException ioEx) {
-            log.debug("Exception encountered checking connection {}", socket, ioEx);
+            log.debug("process({}) [{}] - IOException encountered checking connection", exchange.getExchangeId(), socket, ioEx);
             exchange.setException(ioEx);
             mllpBuffer.resetSocket(socket);
         } finally {
             mllpBuffer.reset();
         }
+
+        log.trace("process({}) [{}] - exiting", exchange.getExchangeId(), socket);
     }
 
     private String processAcknowledgment(byte[] hl7MessageBytes, byte[] hl7AcknowledgementBytes) throws MllpException {
@@ -328,7 +333,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
                             // Found the beginning of the MSA - the next two bytes should be our acknowledgement code
                             msaStartIndex = i + 1;
                             if (bA != hl7AcknowledgementBytes[i + 5] && bC != hl7AcknowledgementBytes[i + 5]) {
-                                String errorMessage = "Unsupported acknowledgement type: " + new String(hl7AcknowledgementBytes, i + 5, 2);
+                                String errorMessage = String.format("processAcknowledgment(hl7MessageBytes[%d], hl7AcknowledgementBytes[%d]) - unsupported acknowledgement type: '%s'", hl7MessageBytes == null ? -1 : hl7MessageBytes.length, hl7AcknowledgementBytes.length, new String(hl7AcknowledgementBytes, i + 5, 2));
                                 throw new MllpInvalidAcknowledgementException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes);
                             } else {
                                 switch (hl7AcknowledgementBytes[i + 6]) {
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index 4344dfb..06136f7 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -52,12 +52,16 @@ import org.apache.camel.converter.IOConverter;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerationException;
 import org.apache.camel.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * The MLLP consumer.
  */
 @ManagedResource(description = "MLLP Producer")
 public class MllpTcpServerConsumer extends DefaultConsumer {
+    final Logger log;
     final ExecutorService validationExecutor;
     final ExecutorService consumerExecutor;
 
@@ -69,7 +73,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
 
     public MllpTcpServerConsumer(MllpEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
-        log.trace("MllpTcpServerConsumer(endpoint, processor)");
+        log = LoggerFactory.getLogger(String.format("%s.%d", this.getClass().getName(), endpoint.getPort()));
 
         validationExecutor = Executors.newCachedThreadPool();
         consumerExecutor = new ThreadPoolExecutor(1, getConfiguration().getMaxConcurrentConsumers(), getConfiguration().getAcceptTimeout(), TimeUnit.MILLISECONDS, new SynchronousQueue<>());
@@ -117,7 +121,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
 
     @Override
     protected void doStop() throws Exception {
-        log.debug("doStop()");
+        log.trace("doStop()");
 
         // Close any client sockets that are currently open
         for (TcpSocketConsumerRunnable consumerClientSocketThread : consumerRunnables.keySet()) {
@@ -193,10 +197,10 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
         TcpServerConsumerValidationRunnable client = new TcpServerConsumerValidationRunnable(this, clientSocket, mllpBuffer);
 
         try {
-            log.debug("Validating consumer for Socket {}", clientSocket);
+            log.debug("validateConsumer({}) - submitting client for validation", clientSocket);
             validationExecutor.submit(client);
         } catch (RejectedExecutionException rejectedExecutionEx) {
-            log.warn("Cannot validate consumer - max validations already active");
+            log.warn("validateConsumer({}) - cannot validate client - max validations already active", clientSocket);
             mllpBuffer.resetSocket(clientSocket);
         }
     }
@@ -211,11 +215,11 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
 
         consumerRunnables.put(client, System.currentTimeMillis());
         try {
-            log.info("Starting consumer for Socket {}", clientSocket);
+            log.info("startConsumer({}) - starting consumer", clientSocket);
             consumerExecutor.submit(client);
             getEndpoint().updateLastConnectionEstablishedTicks();
         } catch (RejectedExecutionException rejectedExecutionEx) {
-            log.warn("Cannot start consumer - max consumers already active");
+            log.warn("startConsumer({}) - cannot start consumer - max consumers already active", clientSocket);
             mllpBuffer.resetSocket(clientSocket);
         }
     }
@@ -227,7 +231,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
         consumerRunnables.put(consumerRunnable, now);
 
         // Send the message on to Camel for processing and wait for the response
-        log.debug("Populating the exchange with received message");
+        log.debug("processMessage(hl7MessageBytes[{}], {}) - populating the exchange with received payload", hl7MessageBytes == null ? -1 : hl7MessageBytes.length, consumerRunnable.getSocket());
         Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut);
         if (getConfiguration().hasCharsetName()) {
             exchange.setProperty(Exchange.CHARSET_NAME, getConfiguration().getCharsetName());
@@ -267,17 +271,17 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
                 message.setBody(hl7MessageBytes, byte[].class);
             }
 
-            log.debug("Calling processor");
+            log.debug("processMessage(hl7MessageBytes[{}], {}) - calling processor", hl7MessageBytes == null ? -1 : hl7MessageBytes.length, consumerRunnable.getSocket());
             try {
                 getProcessor().process(exchange);
                 sendAcknowledgement(hl7MessageBytes, exchange, consumerRunnable);
             } catch (Exception unexpectedEx) {
-                String resetMessage = "Unexpected exception processing exchange";
+                String resetMessage = "processMessage(byte[], TcpSocketConsumerRunnable) - Unexpected exception processing exchange";
                 consumerRunnable.resetSocket(resetMessage);
                 getExceptionHandler().handleException(resetMessage, exchange, unexpectedEx);
             }
         } catch (Exception uowEx) {
-            getExceptionHandler().handleException("Unexpected exception creating Unit of Work", exchange, uowEx);
+            getExceptionHandler().handleException("processMessage(byte[], TcpSocketConsumerRunnable) - Unexpected exception creating Unit of Work", exchange, uowEx);
         } finally {
             if (exchange != null) {
                 doneUoW(exchange);
@@ -389,7 +393,8 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
 
 
     void sendAcknowledgement(byte[] originalHl7MessageBytes, Exchange exchange, TcpSocketConsumerRunnable consumerRunnable) {
-        log.trace("entering sendAcknowledgement(byte[], Exchange)");
+        log.trace("sendAcknowledgement(originalHl7MessageBytes[{}], Exchange[{}], {}) - entering",
+            originalHl7MessageBytes == null ? -1 : originalHl7MessageBytes.length, exchange.getExchangeId(), consumerRunnable.getSocket());
 
         getEndpoint().checkBeforeSendProperties(exchange, consumerRunnable.getSocket(), log);
 
@@ -519,6 +524,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
 
         Charset charset = getConfiguration().getCharset(exchange);
 
+        // TODO:  re-evaluate this - it seems that the MLLP buffer should be populated by now
         if (consumerRunnable.getMllpBuffer().hasCompleteEnvelope()) {
             // The mllpBuffer will be used if bufferWrites is set or if auto acknowledgement is used
             message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, consumerRunnable.getMllpBuffer().toMllpPayload());
@@ -526,7 +532,9 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
 
             // Send the acknowledgement
             if (log.isDebugEnabled()) {
-                log.debug("Sending Acknowledgement: {}", consumerRunnable.getMllpBuffer().toPrintFriendlyHl7String());
+                log.debug("sendAcknowledgement(originalHl7MessageBytes[{}], Exchange[{}], {}) - Sending Acknowledgement: {}",
+                    originalHl7MessageBytes == null ? -1 : originalHl7MessageBytes.length, exchange.getExchangeId(), consumerRunnable.getSocket(),
+                    consumerRunnable.getMllpBuffer().toPrintFriendlyHl7String());
             }
 
             try {
@@ -545,7 +553,9 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
 
             // Send the acknowledgement
             if (log.isDebugEnabled()) {
-                log.debug("Sending Acknowledgement: {}", Hl7Util.convertToPrintFriendlyString(acknowledgementMessageBytes));
+                log.debug("sendAcknowledgement(originalHl7MessageBytes[{}], Exchange[{}], {}) - Sending Acknowledgement: {}",
+                    originalHl7MessageBytes == null ? -1 : originalHl7MessageBytes.length, exchange.getExchangeId(), consumerRunnable.getSocket(),
+                    Hl7Util.convertToPrintFriendlyString(acknowledgementMessageBytes));
             }
 
             try {
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
index 1d7daf6..3071351 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
@@ -90,29 +90,29 @@ public class MllpSocketBuffer {
         if (sourceBytes != null && sourceBytes.length > 0) {
             if (offset < 0) {
                 throw new IndexOutOfBoundsException(
-                    String.format("offset <%d> is less than zero",
-                        offset));
+                    String.format("write(byte[%d], offset[%d], writeCount[%d]) - offset is less than zero",
+                        sourceBytes.length, offset, writeCount));
             }
             if (offset > sourceBytes.length) {
                 throw new IndexOutOfBoundsException(
-                    String.format("offset <%d> is greater than write count <%d>",
-                        offset, writeCount));
+                    String.format("write(byte[%d], offset[%d], writeCount[%d]) - offset is greater than write count",
+                        sourceBytes.length, offset, writeCount));
             }
 
             if (writeCount < 0) {
                 throw new IndexOutOfBoundsException(
-                    String.format("write count <%d> is less than zero",
-                        writeCount));
+                    String.format("write(byte[%d], offset[%d], writeCount[%d]) - write count is less than zero",
+                        sourceBytes.length, offset, writeCount));
             }
             if (writeCount > sourceBytes.length) {
                 throw new IndexOutOfBoundsException(
-                    String.format("write count <%d> is greater than length of the source byte[] <%d>",
-                        writeCount, sourceBytes.length));
+                    String.format("write(byte[%d], offset[%d], writeCount[%d]) - write count is greater than length of the source byte[]",
+                        sourceBytes.length, offset, writeCount));
             }
             if ((offset + writeCount) - sourceBytes.length > 0) {
                 throw new IndexOutOfBoundsException(
-                    String.format("offset <%d> plus write count <%d> is <%d> is greater than length <%d> of the source byte[]",
-                    offset, writeCount, offset + writeCount, sourceBytes.length));
+                    String.format("write(byte[%d], offset[%d], writeCount[%d]) - offset plus write count <%d> is greater than length of the source byte[]",
+                        sourceBytes.length, offset, writeCount, offset + writeCount));
             }
 
             ensureCapacity(writeCount);
@@ -144,17 +144,17 @@ public class MllpSocketBuffer {
 
         if (hl7Payload != null && hl7Payload.length > 0) {
             if (hl7Payload[0] != MllpProtocolConstants.START_OF_BLOCK) {
-                write(MllpProtocolConstants.START_OF_BLOCK);
+                openMllpEnvelope();
             }
 
             write(hl7Payload, offset, length);
 
             if (!hasCompleteEnvelope()) {
-                write(MllpProtocolConstants.PAYLOAD_TERMINATOR);
+                closeMllpEnvelope();
             }
         } else {
-            write(MllpProtocolConstants.START_OF_BLOCK);
-            write(MllpProtocolConstants.PAYLOAD_TERMINATOR);
+            openMllpEnvelope();
+            closeMllpEnvelope();
         }
     }
 
@@ -176,7 +176,7 @@ public class MllpSocketBuffer {
 
     public synchronized void readFrom(Socket socket, int receiveTimeout, int readTimeout) throws MllpSocketException, SocketTimeoutException {
         if (socket != null && socket.isConnected() && !socket.isClosed()) {
-            log.trace("Entering readFrom for {} ...", socket);
+            log.trace("readFrom({}, {}, {}) - entering", socket, receiveTimeout, readTimeout);
             ensureCapacity(MIN_BUFFER_SIZE);
 
             try {
@@ -197,27 +197,27 @@ public class MllpSocketBuffer {
             } catch (SocketTimeoutException timeoutEx) {
                 throw timeoutEx;
             } catch (IOException ioEx) {
-                final String exceptionMessage = "Exception encountered reading Socket";
+                final String exceptionMessage = String.format("readFrom(%s, %d, %d) - IOException encountered", socket, receiveTimeout, readTimeout);
                 resetSocket(socket, exceptionMessage);
                 throw new MllpSocketException(exceptionMessage, ioEx);
             } finally {
                 if (size() > 0 && !hasCompleteEnvelope()) {
                     if (!hasEndOfData() && hasEndOfBlock() && endOfBlockIndex < size() - 1) {
-                        log.warn("readFrom {} exiting with partial payload {}", socket, Hl7Util.convertToPrintFriendlyString(buffer, 0, size() - 1));
+                        log.warn("readFrom({}, {}, {}) - exiting with partial payload {}", socket, receiveTimeout, readTimeout, Hl7Util.convertToPrintFriendlyString(buffer, 0, size() - 1));
                     }
                 }
             }
 
         } else {
-            log.warn("Socket is invalid - no data read");
+            log.warn("readFrom({}, {}, {}) - no data read because Socket is invalid", socket, receiveTimeout, readTimeout);
         }
 
-        log.trace("Exiting readFrom ...");
+        log.trace("readFrom({}, {}, {}) - exiting", socket, receiveTimeout, readTimeout);
     }
 
     public synchronized void writeTo(Socket socket) throws MllpSocketException {
         if (socket != null && socket.isConnected() && !socket.isClosed()) {
-            log.trace("Entering writeTo for {} ...", socket);
+            log.trace("writeTo({}) - entering", socket);
             if (!isEmpty()) {
                 try {
                     OutputStream socketOutputStream = socket.getOutputStream();
@@ -238,18 +238,18 @@ public class MllpSocketBuffer {
                     }
                     socketOutputStream.flush();
                 } catch (IOException ioEx) {
-                    final String exceptionMessage = "Exception encountered writing Socket";
+                    final String exceptionMessage = String.format("writeTo({}) - IOException encountered", socket);
                     resetSocket(socket, exceptionMessage);
                     throw new MllpSocketException(exceptionMessage, ioEx);
                 }
             } else {
-                log.warn("Ignoring call to writeTo(byte[] payload) for {} - MLLP payload is null or empty", socket);
+                log.warn("writeTo({}) - no data written because buffer is empty", socket);
             }
         } else {
-            log.warn("Socket is invalid - no data written");
+            log.warn("writeTo({}) - no data written because Socket is invalid", socket);
         }
 
-        log.trace("Exiting writeTo ...");
+        log.trace("writeTo({}) - exiting", socket);
     }
 
     public synchronized byte[] toByteArray() {
@@ -287,9 +287,9 @@ public class MllpSocketBuffer {
                 if (Charset.isSupported(charsetName)) {
                     return toString(Charset.forName(charsetName));
                 }
-                log.warn("Unsupported character set name {} - using the MLLP default character set {}", charsetName, MllpComponent.getDefaultCharset());
+                log.warn("toString(charsetName[{}]) - unsupported character set name - using the MLLP default character set {}", charsetName, MllpComponent.getDefaultCharset());
             } catch (Exception charsetEx) {
-                log.warn("Ignoring exception encountered determining character set for name {} - using the MLLP default character set {}",
+                log.warn("toString(charsetName[{}]) - ignoring exception encountered determining character set - using the MLLP default character set {}",
                     charsetName, MllpComponent.getDefaultCharset(), charsetEx);
             }
 
@@ -355,9 +355,9 @@ public class MllpSocketBuffer {
                 if (Charset.isSupported(charsetName)) {
                     return toHl7String(Charset.forName(charsetName));
                 }
-                log.warn("Unsupported character set name {} - using the MLLP default character set {}", charsetName, MllpComponent.getDefaultCharset());
+                log.warn("toHl7String(charsetName[{}]) - unsupported character set name - using the MLLP default character set {}", charsetName, MllpComponent.getDefaultCharset());
             } catch (Exception charsetEx) {
-                log.warn("Ignoring exception encountered determining character set for name {} - using the MLLP default character set {}",
+                log.warn("toHl7String(charsetName[{}]) - ignoring exception encountered determining character set for name - using the MLLP default character set {}",
                     charsetName, MllpComponent.getDefaultCharset(), charsetEx);
             }
         }
@@ -612,12 +612,14 @@ public class MllpSocketBuffer {
     }
 
     void readSocketInputStream(InputStream socketInputStream, Socket socket) throws MllpSocketException, SocketTimeoutException {
-        log.trace("Entering readSocketInputStream - size = {}", size());
+        log.trace("readSocketInputStream(socketInputStream, {}) - entering with initial buffer size = {}", socket, size());
         try {
             int readCount = socketInputStream.read(buffer, availableByteCount, buffer.length - availableByteCount);
             if (readCount == MllpProtocolConstants.END_OF_STREAM) {
+                final String exceptionMessage = String.format("readSocketInputStream(socketInputStream, %s) - END_OF_STREAM returned from SocketInputStream.read(byte[%d], %d, %d)",
+                    socket, buffer.length, availableByteCount, buffer.length - availableByteCount);
                 resetSocket(socket);
-                throw new MllpSocketException("END_OF_STREAM returned from SocketInputStream.read(byte[], off, len)");
+                throw new MllpSocketException(exceptionMessage);
             }
             if (readCount > 0) {
                 for (int i = 0; (startOfBlockIndex == -1 || endOfBlockIndex == -1) && i < readCount; ++i) {
@@ -626,22 +628,20 @@ public class MllpSocketBuffer {
                 availableByteCount += readCount;
 
                 if (hasStartOfBlock()) {
-                    log.trace("Read {} bytes for a total of {} bytes", readCount, availableByteCount);
+                    log.trace("readSocketInputStream(socketInputStream, {}) - read {} bytes for a total of {} bytes", socket, readCount, availableByteCount);
                 } else {
-                    log.warn("Ignoring {} bytes received before START_OF_BLOCK", size(), toPrintFriendlyStringAndReset());
+                    log.warn("readSocketInputStream(socketInputStream, {}) - ignoring {} bytes received before START_OF_BLOCK", socket, size(), toPrintFriendlyStringAndReset());
                 }
             }
         } catch (SocketTimeoutException timeoutEx) {
             throw timeoutEx;
-        } catch (SocketException socketEx) {
-            resetSocket(socket);
-            throw new MllpSocketException("SocketException encountered in readSocketInputStream", socketEx);
         } catch (IOException ioEx) {
-            final String exceptionMessage = "IOException thrown from SocketInputStream.read(byte[], off, len)";
+            final String exceptionMessage = String.format("readSocketInputStream(socketInputStream, %s) - IOException thrown from SocketInputStream.read(byte[%d], %d, %d) from %s",
+                socket, buffer.length, availableByteCount, buffer.length - availableByteCount, socket);
             resetSocket(socket);
-            throw new MllpSocketException("IOException thrown from SocketInputStream.read(byte[], off, len)", ioEx);
+            throw new MllpSocketException(exceptionMessage, ioEx);
         } finally {
-            log.trace("Exiting readSocketInputStream - size = {}", size());
+            log.trace("readSocketInputStream(socketInputStream, {}) - exiting with buffer size = {}", socket, size());
         }
     }
 
@@ -675,7 +675,7 @@ public class MllpSocketBuffer {
                 try {
                     socket.shutdownInput();
                 } catch (IOException ignoredEx) {
-                    // TODO: Maybe log this
+                    log.trace("doSocketClose(socket[{}], logMessage[{}], reset[{}] - ignoring exception raised by Socket.shutdownInput()", socket, logMessage, reset, ignoredEx);
                 }
             }
 
@@ -683,17 +683,17 @@ public class MllpSocketBuffer {
                 try {
                     socket.shutdownOutput();
                 } catch (IOException ignoredEx) {
-                    // TODO: Maybe log this
+                    log.trace("doSocketClose(socket[{}], logMessage[{}], reset[{}] - ignoring exception raised by Socket.shutdownOutput()", socket, logMessage, reset, ignoredEx);
                 }
             }
 
             if (reset) {
+                final boolean on = true;
+                final int linger = 0;
                 try {
-                    final boolean on = true;
-                    final int linger = 0;
                     socket.setSoLinger(on, linger);
                 } catch (IOException ignoredEx) {
-                    // TODO: Maybe log this
+                    log.trace("doSocketClose(socket[{}], logMessage[{}], reset[{}] - ignoring exception raised by Socket.setSoLinger({}, {})", socket, logMessage, reset, on, linger, ignoredEx);
                 }
             }
 
@@ -701,6 +701,7 @@ public class MllpSocketBuffer {
                 socket.close();
             } catch (IOException ignoredEx) {
                 // TODO: Maybe log this
+                log.trace("doSocketClose(socket[{}], logMessage[{}], reset[{}] - ignoring exception raised by Socket.close()", socket, logMessage, reset, ignoredEx);
             }
         }
     }
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java
index cd87fc6..ee473c4 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 
 /**
@@ -38,6 +39,21 @@ public class MllpSocketBufferTest extends SocketBufferTestSupport {
      * @throws Exception in the event of a test error.
      */
     @Test
+    public void testConstructorWithNullEndpoing() throws Exception {
+        try {
+            new MllpSocketBuffer(null);
+            fail("Constructor should have thrown an exception with a null Endpoint argument");
+        } catch (IllegalArgumentException expectedEx) {
+            assertEquals("MllpEndpoint cannot be null", expectedEx.getMessage());
+        }
+    }
+
+    /**
+     * Description of test.
+     *
+     * @throws Exception in the event of a test error.
+     */
+    @Test
     public void testToHl7StringWithRequiredEndOfData() throws Exception {
         assertNull(instance.toHl7String());
 
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java
index 903669e..ae097ac 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java
@@ -228,7 +228,7 @@ public class MllpSocketBufferWriteTest extends SocketBufferTestSupport {
             instance.write(payload, -5, payload.length);
             fail("Exception should have been thrown");
         } catch (IndexOutOfBoundsException expectedEx) {
-            assertEquals("offset <-5> is less than zero", expectedEx.getMessage());
+            assertEquals("write(byte[4], offset[-5], writeCount[4]) - offset is less than zero", expectedEx.getMessage());
         }
     }
 
@@ -245,7 +245,7 @@ public class MllpSocketBufferWriteTest extends SocketBufferTestSupport {
             instance.write(payload, payload.length + 1, payload.length);
             fail("Exception should have been thrown");
         } catch (IndexOutOfBoundsException expectedEx) {
-            assertEquals("offset <5> is greater than write count <4>", expectedEx.getMessage());
+            assertEquals("write(byte[4], offset[5], writeCount[4]) - offset is greater than write count", expectedEx.getMessage());
         }
     }
 
@@ -261,7 +261,7 @@ public class MllpSocketBufferWriteTest extends SocketBufferTestSupport {
             instance.write("BLAH".getBytes(), 0, -5);
             fail("Exception should have been thrown");
         } catch (IndexOutOfBoundsException expectedEx) {
-            assertEquals("write count <-5> is less than zero", expectedEx.getMessage());
+            assertEquals("write(byte[4], offset[0], writeCount[-5]) - write count is less than zero", expectedEx.getMessage());
         }
     }
 
@@ -278,21 +278,21 @@ public class MllpSocketBufferWriteTest extends SocketBufferTestSupport {
             instance.write(payload, 0, payload.length + 1);
             fail("Exception should have been thrown");
         } catch (IndexOutOfBoundsException expectedEx) {
-            assertEquals("write count <5> is greater than length of the source byte[] <4>", expectedEx.getMessage());
+            assertEquals("write(byte[4], offset[0], writeCount[5]) - write count is greater than length of the source byte[]", expectedEx.getMessage());
         }
 
         try {
             instance.write("BLAH".getBytes(), 1, payload.length);
             fail("Exception should have been thrown");
         } catch (IndexOutOfBoundsException expectedEx) {
-            assertEquals("offset <1> plus write count <4> is <5> is greater than length <4> of the source byte[]", expectedEx.getMessage());
+            assertEquals("write(byte[4], offset[1], writeCount[4]) - offset plus write count <5> is greater than length of the source byte[]", expectedEx.getMessage());
         }
 
         try {
             instance.write("BLAH".getBytes(), 2, payload.length - 1);
             fail("Exception should have been thrown");
         } catch (IndexOutOfBoundsException expectedEx) {
-            assertEquals("offset <2> plus write count <3> is <5> is greater than length <4> of the source byte[]", expectedEx.getMessage());
+            assertEquals("write(byte[4], offset[2], writeCount[3]) - offset plus write count <5> is greater than length of the source byte[]", expectedEx.getMessage());
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
quinn@apache.org.

[camel] 01/02: CAMEL-12325: Correct idleTimeout behaviour for the MllpTcpClientProducer

Posted by qu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

quinn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 14d2cf99cd0fbd85ab5a8383e4516acdffb65a7b
Author: Quinn Stevenson <qu...@apache.org>
AuthorDate: Thu Mar 15 07:49:00 2018 -0600

    CAMEL-12325: Correct idleTimeout behaviour for the MllpTcpClientProducer
---
 .../apache/camel/component/mllp/MllpEndpoint.java  |  11 +-
 .../component/mllp/MllpTcpClientProducer.java      | 111 ++++++++++++++-------
 .../component/mllp/internal/MllpSocketBuffer.java  |  10 +-
 .../MllpTcpClientProducerConnectionErrorTest.java  |  53 ++++++----
 4 files changed, 121 insertions(+), 64 deletions(-)

diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index ae8975b..dbadfa8 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -53,7 +53,8 @@ import org.slf4j.LoggerFactory;
  * <p/>
  */
 @ManagedResource(description = "MLLP Endpoint")
-@UriEndpoint(scheme = "mllp", firstVersion = "2.17.0", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp")
+// @UriEndpoint(scheme = "mllp", firstVersion = "2.17.0", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp")
+@UriEndpoint(scheme = "mllp", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp")
 public class MllpEndpoint extends DefaultEndpoint {
     // Use constants from MllpProtocolConstants
     @Deprecated()
@@ -175,6 +176,14 @@ public class MllpEndpoint extends DefaultEndpoint {
         return lastConnectionTerminatedTicks != null ? new Date(lastConnectionTerminatedTicks) : null;
     }
 
+    public boolean hasLastConnectionActivityTicks() {
+        return lastConnectionActivityTicks != null && lastConnectionActivityTicks > 0;
+    }
+
+    public Long getLastConnectionActivityTicks() {
+        return lastConnectionActivityTicks;
+    }
+
     public void updateLastConnectionActivityTicks() {
         updateLastConnectionActivityTicks(System.currentTimeMillis());
     }
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
index 9b7ca63..df5705a 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
@@ -39,6 +39,10 @@ import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.component.mllp.internal.Hl7Util;
 import org.apache.camel.component.mllp.internal.MllpSocketBuffer;
 import org.apache.camel.impl.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
 
 /**
  * The MLLP producer.
@@ -50,7 +54,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
     final MllpSocketBuffer mllpBuffer;
 
     ScheduledExecutorService idleTimeoutExecutor;
-    long lastProcessCallTicks = -1;
+    // long lastProcessCallTicks = -1;
 
     private String cachedLocalAddress;
     private String cachedRemoteAddress;
@@ -65,7 +69,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
 
     @ManagedAttribute(description = "Last activity time")
     public Date getLastActivityTime() {
-        return new Date(lastProcessCallTicks);
+        return getEndpoint().getLastConnectionActivityTime();
     }
 
     @ManagedAttribute(description = "Connection")
@@ -122,8 +126,8 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
     }
 
     @Override
-    public synchronized void process(Exchange exchange) throws Exception {
-        log.trace("Processing Exchange {}", exchange.getExchangeId());
+    public synchronized void process(Exchange exchange) throws MllpException {
+        log.trace("Processing Exchange {} for {}", exchange.getExchangeId(), socket);
         getEndpoint().updateLastConnectionActivityTicks();
 
         Message message = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
@@ -158,77 +162,96 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
                 }
             }
 
-            log.debug("Sending message to external system");
-            getEndpoint().updateLastConnectionEstablishedTicks();
+            log.debug("Sending message to external system {}", socket);
 
             try {
                 mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
                 mllpBuffer.writeTo(socket);
             } catch (MllpSocketException writeEx) {
                 // Connection may have been reset - try one more time
-                log.debug("Exception encountered reading acknowledgement - attempting reconnect", writeEx);
+                log.debug("Exception encountered writing payload to {} - attempting reconnect", writeEx, socket);
                 try {
                     checkConnection();
-                    log.trace("Reconnected succeeded - resending payload");
+                    log.trace("Reconnected succeeded - resending payload to {}", socket);
                     try {
                         mllpBuffer.writeTo(socket);
                     } catch (MllpSocketException retryWriteEx) {
-                        exchange.setException(retryWriteEx);
+                        log.warn("Exception encountered attempting to write payload to {} after reconnect - sending original exception to exchange", socket, retryWriteEx);
+                        exchange.setException(new MllpWriteException("Exception encountered writing payload after reconnect", mllpBuffer.toByteArrayAndReset(), retryWriteEx));
                     }
                 } catch (IOException reconnectEx) {
-                    log.debug("Reconnected failed - sending exception to exchange", reconnectEx);
-                    exchange.setException(reconnectEx);
+                    log.warn("Exception encountered attempting to reconnect - sending original exception to exchange", reconnectEx);
+                    exchange.setException(new MllpWriteException("Exception encountered writing payload", mllpBuffer.toByteArrayAndReset(), writeEx));
+                    mllpBuffer.resetSocket(socket);
                 }
-
             }
 
             if (exchange.getException() == null) {
-                log.debug("Reading acknowledgement from external system");
+                log.debug("Reading acknowledgement from external system {}", socket);
                 try {
                     mllpBuffer.reset();
                     mllpBuffer.readFrom(socket);
                 } catch (MllpSocketException receiveAckEx) {
                     // Connection may have been reset - try one more time
-                    log.debug("Exception encountered reading acknowledgement - attempting reconnect", receiveAckEx);
+                    log.debug("Exception encountered reading acknowledgement from {} - attempting reconnect", socket, receiveAckEx);
                     try {
                         checkConnection();
                     } catch (IOException reconnectEx) {
-                        log.debug("Reconnected failed - sending original exception to exchange", reconnectEx);
+                        log.warn("Exception encountered attempting to reconnect after acknowledgement read failure - sending original acknowledgement exception to exchange", reconnectEx);
                         exchange.setException(new MllpAcknowledgementReceiveException("Exception encountered reading acknowledgement", hl7MessageBytes, receiveAckEx));
+                        mllpBuffer.resetSocket(socket);
                     }
 
                     if (exchange.getException() == null) {
-                        log.trace("Reconnected succeeded - resending payload");
+                        log.trace("Reconnected succeeded - resending payload to {}", socket);
                         try {
                             mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
                             mllpBuffer.writeTo(socket);
                         } catch (MllpSocketException writeRetryEx) {
-                            exchange.setException(new MllpWriteException("Failed to write HL7 message to socket", hl7MessageBytes, writeRetryEx));
+                            log.warn("Exception encountered attempting to write payload to {} after read failure and successful reconnect - sending original exception to exchange",
+                                socket, writeRetryEx);
+                            exchange.setException(new MllpWriteException("Exception encountered writing payload after read failure and reconnect", hl7MessageBytes, receiveAckEx));
                         }
 
                         if (exchange.getException() == null) {
-                            log.trace("Resend succeeded - reading acknowledgement");
+                            log.trace("Resend succeeded - reading acknowledgement from {}", socket);
                             try {
                                 mllpBuffer.reset();
                                 mllpBuffer.readFrom(socket);
                             } catch (MllpSocketException secondReceiveEx) {
                                 if (mllpBuffer.isEmpty()) {
-                                    Exception exchangeEx = new MllpAcknowledgementReceiveException("Exception encountered receiving Acknowledgement", hl7MessageBytes, secondReceiveEx);
+                                    log.warn("Exception encountered reading acknowledgement from {} after successful reconnect and resend", socket, secondReceiveEx);
+                                    Exception exchangeEx = new MllpAcknowledgementReceiveException("Exception encountered receiving Acknowledgement", hl7MessageBytes, receiveAckEx);
                                     exchange.setException(exchangeEx);
                                 } else {
                                     byte[] partialAcknowledgment = mllpBuffer.toByteArray();
                                     mllpBuffer.reset();
+                                    log.warn("Exception encountered reading a complete acknowledgement from {} after successful reconnect and resend", socket, secondReceiveEx);
                                     Exception exchangeEx = new MllpAcknowledgementReceiveException("Exception encountered receiving complete Acknowledgement",
-                                        hl7MessageBytes, partialAcknowledgment, secondReceiveEx);
+                                        hl7MessageBytes, partialAcknowledgment, receiveAckEx);
                                     exchange.setException(exchangeEx);
                                 }
+                            } catch (SocketTimeoutException secondReadTimeoutEx) {
+                                if (mllpBuffer.isEmpty()) {
+                                    log.warn("Timeout receiving HL7 Acknowledgment from {} after successful reconnect", socket, secondReadTimeoutEx);
+                                    exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving HL7 Acknowledgement after successful reconnect",
+                                        hl7MessageBytes, secondReadTimeoutEx));
+                                } else {
+                                    log.warn("Timeout receiving complete HL7 Acknowledgment from {} after successful reconnect", socket, secondReadTimeoutEx);
+                                    exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving complete HL7 Acknowledgement after successful reconnect",
+                                        hl7MessageBytes, mllpBuffer.toByteArray(), secondReadTimeoutEx));
+                                    mllpBuffer.reset();
+                                }
+                                mllpBuffer.resetSocket(socket);
                             }
                         }
                     }
                 } catch (SocketTimeoutException timeoutEx) {
                     if (mllpBuffer.isEmpty()) {
+                        log.warn("Timeout receiving HL7 Acknowledgment from {}", socket, timeoutEx);
                         exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving HL7 Acknowledgement", hl7MessageBytes, timeoutEx));
                     } else {
+                        log.warn("Timeout receiving complete HL7 Acknowledgment from {}", socket, timeoutEx);
                         exchange.setException(new MllpAcknowledgementTimeoutException("Timeout receiving complete HL7 Acknowledgement", hl7MessageBytes, mllpBuffer.toByteArray(), timeoutEx));
                         mllpBuffer.reset();
                     }
@@ -239,7 +262,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
                     if (mllpBuffer.hasCompleteEnvelope()) {
                         byte[] acknowledgementBytes = mllpBuffer.toMllpPayload();
 
-                        log.debug("Populating message headers with the acknowledgement from the external system");
+                        log.debug("Populating message headers with the acknowledgement from the external system {}", socket);
                         message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
                         if (acknowledgementBytes != null && acknowledgementBytes.length > 0) {
                             message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, new String(acknowledgementBytes, getConfiguration().getCharset(exchange, acknowledgementBytes)));
@@ -255,7 +278,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
                         }
 
                         if (exchange.getException() == null) {
-                            log.debug("Processing the acknowledgement from the external system");
+                            log.debug("Processing the acknowledgement from the external system {}", socket);
                             try {
                                 message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, processAcknowledgment(hl7MessageBytes, acknowledgementBytes));
                             } catch (MllpNegativeAcknowledgementException nackEx) {
@@ -272,6 +295,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
             }
 
         } catch (IOException ioEx) {
+            log.debug("Exception encountered checking connection {}", socket, ioEx);
             exchange.setException(ioEx);
             mllpBuffer.resetSocket(socket);
         } finally {
@@ -358,26 +382,34 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
      */
     void checkConnection() throws IOException {
         if (null == socket || socket.isClosed() || !socket.isConnected()) {
-            socket = new Socket();
+            if (socket == null) {
+                log.debug("checkConnection() - Socket is null - attempting to establish connection", socket);
+            } else if (socket.isClosed()) {
+                log.info("checkConnection() - Socket {} is closed - attempting to establish new connection", socket);
+            } else if (!socket.isConnected()) {
+                log.info("checkConnection() - Socket {} is not connected - attempting to establish new connection", socket);
+            }
+
+            Socket newSocket = new Socket();
 
             if (getConfiguration().hasKeepAlive()) {
-                socket.setKeepAlive(getConfiguration().getKeepAlive());
+                newSocket.setKeepAlive(getConfiguration().getKeepAlive());
             }
             if (getConfiguration().hasTcpNoDelay()) {
-                socket.setTcpNoDelay(getConfiguration().getTcpNoDelay());
+                newSocket.setTcpNoDelay(getConfiguration().getTcpNoDelay());
             }
 
             if (getConfiguration().hasReceiveBufferSize()) {
-                socket.setReceiveBufferSize(getConfiguration().getReceiveBufferSize());
+                newSocket.setReceiveBufferSize(getConfiguration().getReceiveBufferSize());
             }
             if (getConfiguration().hasSendBufferSize()) {
-                socket.setSendBufferSize(getConfiguration().getSendBufferSize());
+                newSocket.setSendBufferSize(getConfiguration().getSendBufferSize());
             }
             if (getConfiguration().hasReuseAddress()) {
-                socket.setReuseAddress(getConfiguration().getReuseAddress());
+                newSocket.setReuseAddress(getConfiguration().getReuseAddress());
             }
 
-            socket.setSoLinger(false, -1);
+            newSocket.setSoLinger(false, -1);
 
             InetSocketAddress socketAddress;
             if (null == getEndpoint().getHostname()) {
@@ -386,7 +418,12 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
                 socketAddress = new InetSocketAddress(getEndpoint().getHostname(), getEndpoint().getPort());
             }
 
-            socket.connect(socketAddress, getConfiguration().getConnectTimeout());
+            newSocket.connect(socketAddress, getConfiguration().getConnectTimeout());
+            log.info("checkConnection() - established new connection {}", newSocket);
+            getEndpoint().updateLastConnectionEstablishedTicks();
+
+            socket = newSocket;
+
             SocketAddress localSocketAddress = socket.getLocalSocketAddress();
             if (localSocketAddress != null) {
                 cachedLocalAddress = localSocketAddress.toString();
@@ -397,14 +434,12 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
             }
             cachedCombinedAddress = MllpSocketBuffer.formatAddressString(localSocketAddress, remoteSocketAddress);
 
-            log.info("checkConnection() - established new connection {}", cachedCombinedAddress);
-            getEndpoint().updateLastConnectionEstablishedTicks();
-
             if (getConfiguration().hasIdleTimeout()) {
+                log.debug("Scheduling initial idle producer connection check of {} in {} milliseconds", getConnectionAddress(), getConfiguration().getIdleTimeout());
                 idleTimeoutExecutor.schedule(this, getConfiguration().getIdleTimeout(), TimeUnit.MILLISECONDS);
             }
         } else {
-            log.debug("checkConnection() - Connection is still valid - no new connection required");
+            log.debug("checkConnection() - Connection {} is still valid - no new connection required", socket);
         }
     }
 
@@ -415,13 +450,13 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
     public synchronized void run() {
         if (getConfiguration().hasIdleTimeout()) {
             if (null != socket && !socket.isClosed() && socket.isConnected()) {
-                if (lastProcessCallTicks > 0) {
-                    long idleTime = System.currentTimeMillis() - lastProcessCallTicks;
+                if (getEndpoint().hasLastConnectionActivityTicks()) {
+                    long idleTime = System.currentTimeMillis() - getEndpoint().getLastConnectionActivityTicks();
                     if (log.isDebugEnabled()) {
                         log.debug("Checking {} for idle connection: {} - {}", getConnectionAddress(), idleTime, getConfiguration().getIdleTimeout());
                     }
                     if (idleTime >= getConfiguration().getIdleTimeout()) {
-                        log.info("MLLP Connection idle time of '{}' milliseconds met or exceeded the idle producer timeout of '{}' milliseconds - resetting conection",
+                        log.info("MLLP Connection idle time of '{}' milliseconds met or exceeded the idle producer timeout of '{}' milliseconds - resetting connection",
                             idleTime, getConfiguration().getIdleTimeout());
                         mllpBuffer.resetSocket(socket);
                     } else {
@@ -433,7 +468,7 @@ public class MllpTcpClientProducer extends DefaultProducer implements Runnable {
                         idleTimeoutExecutor.schedule(this, delay, TimeUnit.MILLISECONDS);
                     }
                 } else {
-                    log.debug("Scheduling idle producer connection check in {} milliseconds", getConfiguration().getIdleTimeout());
+                    log.debug("No activity detected since initial connection - scheduling idle producer connection check in {} milliseconds", getConfiguration().getIdleTimeout());
                     idleTimeoutExecutor.schedule(this, getConfiguration().getIdleTimeout(), TimeUnit.MILLISECONDS);
                 }
             }
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
index 020ec74..1d7daf6 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
@@ -175,8 +175,8 @@ public class MllpSocketBuffer {
     }
 
     public synchronized void readFrom(Socket socket, int receiveTimeout, int readTimeout) throws MllpSocketException, SocketTimeoutException {
-        log.trace("Entering readFrom ...");
         if (socket != null && socket.isConnected() && !socket.isClosed()) {
+            log.trace("Entering readFrom for {} ...", socket);
             ensureCapacity(MIN_BUFFER_SIZE);
 
             try {
@@ -203,7 +203,7 @@ public class MllpSocketBuffer {
             } finally {
                 if (size() > 0 && !hasCompleteEnvelope()) {
                     if (!hasEndOfData() && hasEndOfBlock() && endOfBlockIndex < size() - 1) {
-                        log.warn("readFrom exiting with partial payload ", Hl7Util.convertToPrintFriendlyString(buffer, 0, size() - 1));
+                        log.warn("readFrom {} exiting with partial payload {}", socket, Hl7Util.convertToPrintFriendlyString(buffer, 0, size() - 1));
                     }
                 }
             }
@@ -216,8 +216,8 @@ public class MllpSocketBuffer {
     }
 
     public synchronized void writeTo(Socket socket) throws MllpSocketException {
-        log.trace("Entering writeTo ...");
         if (socket != null && socket.isConnected() && !socket.isClosed()) {
+            log.trace("Entering writeTo for {} ...", socket);
             if (!isEmpty()) {
                 try {
                     OutputStream socketOutputStream = socket.getOutputStream();
@@ -243,7 +243,7 @@ public class MllpSocketBuffer {
                     throw new MllpSocketException(exceptionMessage, ioEx);
                 }
             } else {
-                log.warn("Ignoring call to writeTo(byte[] payload) - MLLP payload is null or empty");
+                log.warn("Ignoring call to writeTo(byte[] payload) for {} - MLLP payload is null or empty", socket);
             }
         } else {
             log.warn("Socket is invalid - no data written");
@@ -617,7 +617,7 @@ public class MllpSocketBuffer {
             int readCount = socketInputStream.read(buffer, availableByteCount, buffer.length - availableByteCount);
             if (readCount == MllpProtocolConstants.END_OF_STREAM) {
                 resetSocket(socket);
-                throw new SocketException("END_OF_STREAM returned from SocketInputStream.read(byte[], off, len)");
+                throw new MllpSocketException("END_OF_STREAM returned from SocketInputStream.read(byte[], off, len)");
             }
             if (readCount > 0) {
                 for (int i = 0; (startOfBlockIndex == -1 || endOfBlockIndex == -1) && i < readCount; ++i) {
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java
index c348438..dbc6575 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java
@@ -45,6 +45,9 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport {
     @EndpointInject(uri = "direct://source")
     ProducerTemplate source;
 
+    @EndpointInject(uri = "mock://target")
+    MockEndpoint target;
+
     @EndpointInject(uri = "mock://complete")
     MockEndpoint complete;
 
@@ -54,8 +57,8 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport {
     @EndpointInject(uri = "mock://connect-ex")
     MockEndpoint connectEx;
 
-    @EndpointInject(uri = "mock://receive-ex")
-    MockEndpoint receiveEx;
+    @EndpointInject(uri = "mock://acknowledgement-ex")
+    MockEndpoint acknowledgementEx;
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
@@ -74,6 +77,9 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport {
             String routeId = "mllp-sender";
 
             public void configure() {
+                onCompletion()
+                    .to(complete);
+
                 onException(ConnectException.class)
                     .handled(true)
                     .to(connectEx)
@@ -86,17 +92,17 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport {
                     .log(LoggingLevel.ERROR, routeId, "Write Error")
                     .stop();
 
-                onException(MllpAcknowledgementReceiveException.class)
+                onException(MllpAcknowledgementException.class)
                     .handled(true)
-                    .to(receiveEx)
-                    .log(LoggingLevel.ERROR, routeId, "Receive Error")
+                    .to(acknowledgementEx)
+                    .log(LoggingLevel.ERROR, routeId, "Acknowledgement Error")
                     .stop();
 
                 from(source.getDefaultEndpoint()).routeId(routeId)
                     .log(LoggingLevel.INFO, routeId, "Sending Message")
                     .toF("mllp://%s:%d", mllpServer.getListenHost(), mllpServer.getListenPort())
                     .log(LoggingLevel.INFO, routeId, "Received Acknowledgement")
-                    .to(complete);
+                    .to(target);
             }
         };
     }
@@ -108,10 +114,11 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport {
      */
     @Test
     public void testConnectionClosedBeforeSendingHL7Message() throws Exception {
+        target.expectedMessageCount(2);
         complete.expectedMessageCount(2);
         connectEx.expectedMessageCount(0);
         writeEx.expectedMessageCount(0);
-        receiveEx.expectedMessageCount(0);
+        acknowledgementEx.expectedMessageCount(0);
 
         NotifyBuilder oneDone = new NotifyBuilder(context).whenCompleted(1).create();
         NotifyBuilder twoDone = new NotifyBuilder(context).whenCompleted(2).create();
@@ -136,10 +143,11 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport {
      */
     @Test()
     public void testConnectionResetBeforeSendingHL7Message() throws Exception {
+        target.expectedMessageCount(2);
         complete.expectedMessageCount(2);
         connectEx.expectedMessageCount(0);
         writeEx.expectedMessageCount(0);
-        receiveEx.expectedMessageCount(0);
+        acknowledgementEx.expectedMessageCount(0);
 
         NotifyBuilder oneDone = new NotifyBuilder(context).whenCompleted(1).create();
         NotifyBuilder twoDone = new NotifyBuilder(context).whenCompleted(2).create();
@@ -158,10 +166,11 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport {
 
     @Test()
     public void testConnectionClosedBeforeReadingAcknowledgement() throws Exception {
-        complete.expectedMessageCount(0);
+        target.expectedMessageCount(0);
+        complete.expectedMessageCount(1);
         connectEx.expectedMessageCount(0);
         writeEx.expectedMessageCount(0);
-        receiveEx.expectedMessageCount(1);
+        acknowledgementEx.expectedMessageCount(1);
 
         mllpServer.setCloseSocketBeforeAcknowledgementModulus(1);
 
@@ -176,10 +185,11 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport {
 
     @Test()
     public void testConnectionResetBeforeReadingAcknowledgement() throws Exception {
-        complete.expectedMessageCount(0);
+        target.expectedMessageCount(0);
+        complete.expectedMessageCount(1);
         connectEx.expectedMessageCount(0);
         writeEx.expectedMessageCount(0);
-        receiveEx.expectedMessageCount(1);
+        acknowledgementEx.expectedMessageCount(1);
 
         mllpServer.setResetSocketBeforeAcknowledgementModulus(1);
 
@@ -195,7 +205,8 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport {
 
     @Test()
     public void testServerShutdownBeforeSendingHL7Message() throws Exception {
-        complete.expectedMessageCount(1);
+        target.expectedMessageCount(1);
+        complete.expectedMessageCount(2);
         connectEx.expectedMessageCount(0);
 
         NotifyBuilder done = new NotifyBuilder(context).whenCompleted(2).create();
@@ -212,12 +223,13 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport {
         assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
 
         // Depending on the timing, either a write or a receive exception will be thrown
-        assertEquals("Either a write or a receive exception should have been be thrown", 1, writeEx.getExchanges().size() + receiveEx.getExchanges().size());
+        assertEquals("Either a write or a receive exception should have been be thrown", 1, writeEx.getExchanges().size() + acknowledgementEx.getExchanges().size());
     }
 
     @Test()
     public void testConnectionCloseAndServerShutdownBeforeSendingHL7Message() throws Exception {
-        complete.expectedMessageCount(1);
+        target.expectedMessageCount(1);
+        complete.expectedMessageCount(2);
         connectEx.expectedMessageCount(0);
 
         NotifyBuilder done = new NotifyBuilder(context).whenCompleted(2).create();
@@ -235,15 +247,16 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport {
         assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
 
         // Depending on the timing, either a write or a receive exception will be thrown
-        assertEquals("Either a write or a receive exception should have been be thrown", 1, writeEx.getExchanges().size() + receiveEx.getExchanges().size());
+        assertEquals("Either a write or a receive exception should have been be thrown", 1, writeEx.getExchanges().size() + acknowledgementEx.getExchanges().size());
     }
 
     @Test()
     public void testConnectionResetAndServerShutdownBeforeSendingHL7Message() throws Exception {
-        complete.expectedMessageCount(1);
-        connectEx.expectedMessageCount(1);
-        writeEx.expectedMessageCount(0);
-        receiveEx.expectedMessageCount(0);
+        target.expectedMessageCount(1);
+        complete.expectedMessageCount(2);
+        connectEx.expectedMessageCount(0);
+        writeEx.expectedMessageCount(1);
+        acknowledgementEx.expectedMessageCount(0);
 
         NotifyBuilder done = new NotifyBuilder(context).whenCompleted(2).create();
 

-- 
To stop receiving notification emails like this one, please contact
quinn@apache.org.