You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by qu...@apache.org on 2018/01/31 20:07:12 UTC

[camel] branch master updated: CAMEL-12218 - make bridgeErrorHandler configurable

This is an automated email from the ASF dual-hosted git repository.

quinn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 005b2c9  CAMEL-12218 - make bridgeErrorHandler configurable
005b2c9 is described below

commit 005b2c9a78a357d76d9396b379825d3c033dfac7
Author: Quinn Stevenson <qu...@apache.org>
AuthorDate: Wed Jan 31 13:05:24 2018 -0700

    CAMEL-12218 - make bridgeErrorHandler configurable
---
 .../camel-mllp/src/main/docs/mllp-component.adoc   |  93 +++---
 .../apache/camel/component/mllp/MllpComponent.java |  12 +-
 .../camel/component/mllp/MllpConfiguration.java    |  79 ++++-
 .../apache/camel/component/mllp/MllpEndpoint.java  |  20 +-
 .../component/mllp/MllpTcpServerConsumer.java      | 364 ++++++++++++++++++++-
 .../mllp/internal/TcpServerBindThread.java         |   3 +-
 .../mllp/internal/TcpSocketConsumerRunnable.java   | 364 ++-------------------
 .../springboot/MllpComponentConfiguration.java     |  75 ++++-
 8 files changed, 601 insertions(+), 409 deletions(-)

diff --git a/components/camel-mllp/src/main/docs/mllp-component.adoc b/components/camel-mllp/src/main/docs/mllp-component.adoc
index 972fa31..e332960 100644
--- a/components/camel-mllp/src/main/docs/mllp-component.adoc
+++ b/components/camel-mllp/src/main/docs/mllp-component.adoc
@@ -47,8 +47,8 @@ The MLLP component supports 4 options which are listed below.
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *logPhi* (common) | Set the component to log PHI data. |  | Boolean
-| *logPhiMaxBytes* (common) | Set the maximum number of bytes of PHI that will be logged in a log entry. |  | Integer
+| *logPhi* (advanced) | Set the component to log PHI data. | true | Boolean
+| *logPhiMaxBytes* (advanced) | Set the maximum number of bytes of PHI that will be logged in a log entry. | 5120 | Integer
 | *configuration* (common) | Sets the default configuration to use when creating MLLP endpoints. |  | MllpConfiguration
 | *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean
 |===
@@ -85,29 +85,29 @@ with the following path and query parameters:
 |===
 | Name | Description | Default | Type
 | *autoAck* (common) | Enable/Disable the automatic generation of a MLLP Acknowledgement MLLP Consumers only | true | boolean
-| *bufferWrites* (common) | *Deprecated* Enable/Disable the validation of HL7 Payloads | false | boolean
+| *bufferWrites* (common) | *Deprecated* Enable/Disable the buffering of HL7 payloads before writing to the socket. | false | boolean
 | *hl7Headers* (common) | Enable/Disable the automatic generation of message headers from the HL7 Message MLLP Consumers only | true | boolean
-| *requireEndOfData* (common) | Enable disable strict compliance to the MLLP standard. The MLLP standard specifies START_OF_BLOCKhl7 payloadEND_OF_BLOCKEND_OF_DATA however some systems do not send the final END_OF_DATA byte. This setting controls whether or not the final END_OF_DATA byte is required or optional. | true | boolean
+| *requireEndOfData* (common) | Enable/Disable strict compliance to the MLLP standard. The MLLP standard specifies START_OF_BLOCKhl7 payloadEND_OF_BLOCKEND_OF_DATA however some systems do not send the final END_OF_DATA byte. This setting controls whether or not the final END_OF_DATA byte is required or optional. | true | boolean
 | *stringPayload* (common) | Enable/Disable converting the payload to a String. If enabled HL7 Payloads received from external systems will be validated converted to a String. If the charsetName property is set that character set will be used for the conversion. If the charsetName property is not set the value of MSH-18 will be used to determine th appropriate character set. If MSH-18 is not set then the default ASCII character set will be use. | true | boolean
 | *validatePayload* (common) | Enable/Disable the validation of HL7 Payloads If enabled HL7 Payloads received from external systems will be validated (see Hl7Util.generateInvalidPayloadExceptionMessage for details on the validation). If and invalid payload is detected a MllpInvalidMessageException (for consumers) or a MllpInvalidAcknowledgementException will be thrown. | false | boolean
-| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean
+| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to receive incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. If disabled the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions by logging them at WARN or ERROR level and ignored. | true | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
-| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
-| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. | InOut | ExchangePattern
+| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used (this component only supports synchronous operations). | true | boolean
 | *backlog* (tcp) | The maximum queue length for incoming connection indications (a request to connect) is set to the backlog parameter. If a connection indication arrives when the queue is full the connection is refused. | 5 | Integer
 | *lenientBind* (tcp) | TCP Server Only - Allow the endpoint to start before the TCP ServerSocket is bound. In some environments it may be desirable to allow the endpoint to start before the TCP ServerSocket is bound. | false | boolean
 | *maxConcurrentConsumers* (tcp) | The maximum number of concurrent MLLP Consumer connections that will be allowed. If a new connection is received and the maximum is number are already established the new connection will be reset immediately. | 5 | int
-| *receiveBufferSize* (tcp) | Sets the SO_RCVBUF option to the specified value (in bytes) | 8192 | Integer
 | *reuseAddress* (tcp) | Enable/disable the SO_REUSEADDR socket option. | false | Boolean
 | *acceptTimeout* (timeout) | Timeout (in milliseconds) while waiting for a TCP connection TCP Server Only | 60000 | int
 | *bindRetryInterval* (timeout) | TCP Server Only - The number of milliseconds to wait between bind attempts | 5000 | int
 | *bindTimeout* (timeout) | TCP Server Only - The number of milliseconds to retry binding to a server port | 30000 | int
-| *tcpNoDelay* (tcp) | Enable/disable the TCP_NODELAY socket option. | true | Boolean
 | *connectTimeout* (timeout) | Timeout (in milliseconds) for establishing for a TCP connection TCP Client only | 30000 | int
 | *idleTimeout* (timeout) | The approximate idle time allowed before the Client TCP Connection will be reset. A null value or a value less than or equal to zero will disable the idle timeout. |  | Integer
 | *maxReceiveTimeouts* (timeout) | *Deprecated* The maximum number of timeouts (specified by receiveTimeout) allowed before the TCP Connection will be reset. |  | Integer
 | *keepAlive* (tcp) | Enable/disable the SO_KEEPALIVE socket option. | true | Boolean
+| *receiveBufferSize* (tcp) | Sets the SO_RCVBUF option to the specified value (in bytes) | 8192 | Integer
 | *sendBufferSize* (tcp) | Sets the SO_SNDBUF option to the specified value (in bytes) | 8192 | Integer
+| *tcpNoDelay* (tcp) | Enable/disable the TCP_NODELAY socket option. | true | Boolean
 | *readTimeout* (timeout) | The SO_TIMEOUT value (in milliseconds) used after the start of an MLLP frame has been received | 500 | int
 | *receiveTimeout* (timeout) | The SO_TIMEOUT value (in milliseconds) used when waiting for the start of an MLLP frame | 15000 | int
 | *charsetName* (codec) | Set the CamelCharsetName property on the exchange |  | String
@@ -136,22 +136,22 @@ The MLLP Consumer adds these headers on the Camel message:
 
 [width="100%",cols="<34%,<33%,<33%",]
 |===========================================
-|*Key* |*MSH field* |*Example*
-|CamelMllpLocalAddress |  | 
-|CamelMllpRemoteAddress |  | 
-|CamelMllpSendingApplication |MSH-3 |APPA
-|CamelMllpSendingFacility |MSH-4 |FACA
-|CamelMllpReceivingApplication |MSH-5 |CAMEL
-|CamelMllpReceivingFacility |MSH-6 |FACB
-|CamelMllpTimestamp |MSH-7 |20150106235900
-|CamelMllpSecurity |MSH-8 | 
-|CamelMllpMessageType |MSH-9 |ADT^A04
-|CamelMllpEventType |MSH-9-1 |AD4
-|CamelMllpTriggerEvent |MSH-9-2 |A04
-|CamelMllpMessageControlId |MSH-10 |12345
-|CamelMllpProcessingId |MSH-11 |P
-|CamelMllpVersionId |MSH-12 |2.3.1
-|CamelMllpCharset |MSH-18 | 
+|*Key* |*Description* |
+|CamelMllpLocalAddress | The local TCP Address of the Socket |
+|CamelMllpRemoteAddress | The local TCP Address of the Socket |
+|CamelMllpSendingApplication | MSH-3 value |
+|CamelMllpSendingFacility | MSH-4 value |
+|CamelMllpReceivingApplication | MSH-5 value |
+|CamelMllpReceivingFacility | MSH-6 value |
+|CamelMllpTimestamp | MSH-7 value |
+|CamelMllpSecurity | MSH-8 value |
+|CamelMllpMessageType | MSH-9 value |
+|CamelMllpEventType | MSH-9-1 value |
+|CamelMllpTriggerEvent | MSH-9-2 value |
+|CamelMllpMessageControlId | MSH-10 value |
+|CamelMllpProcessingId | MSH-11 value |
+|CamelMllpVersionId | MSH-12 value |
+|CamelMllpCharset | MSH-18 value |
 |===========================================
 
 All headers are String types. If a header value is missing, its value
@@ -159,19 +159,23 @@ is null.
 
 ### *Exchange Properties*
 
-The type of acknowledgment the MLLP Consumer generates can be controlled
+The type of acknowledgment the MLLP Consumer generates and state of the TCP Socket can be controlled
 by these properties on the Camel exchange:
 
 [width="100%",cols="<34%,<33%,<33%",]
 |====================================
-|*Key* |  |*Example*
-|CamelMllpAcknowledgement |  | 
-|CamelMllpAcknowledgementType |  | AR
+|*Key* |*Type* |*Description*
+|CamelMllpAcknowledgement | byte[] | If present, this property will we sent to client as the MLLP Acknowledgement
+|CamelMllpAcknowledgementString | String | If present and CamelMllpAcknowledgement is not present, this property will we sent to client as the MLLP Acknowledgement
+|CamelMllpAcknowledgementType | String  | If neither CamelMllpAcknowledgement or CamelMllpAcknowledgementString are present and autoAck is true, this property can be used to specify the HL7 acknowledgement type (i.e. AA, AE, AR)
+|CamelMllpAutoAcknowledge | Boolean | Overrides the autoAck query parameter
+
+|CamelMllpCloseConnectionBeforeSend | Boolean | If true, the Socket will be closed before sending data
+|CamelMllpResetConnectionBeforeSend | Boolean | If true, the Socket will be reset before sending data
+|CamelMllpCloseConnectionAfterSend | Boolean | If true, the Socket will be closed immediately after sending data
+|CamelMllpResetConnectionAfterSend | Boolean | If true, the Socket will be reset immediately after sending any data
 |====================================
 
-All headers are String types. If a header value is missing, its value
-is null.
-
 ### Consumer Configuration
 
 ### MLLP Producer
@@ -188,12 +192,23 @@ The MLLP Producer adds these headers on the Camel message:
 
 [width="100%",cols="<34%,<33%,<33%",]
 |===================================
-|*Key* |*MSH field* |*Example*
-|CamelMllpLocalAddress |  | 
-|CamelMllpRemoteAddress |  | 
-|CamelMllpAcknowledgement |  | 
-|CamelMllpAcknowledgementType |  |AA
+|*Key* |*Description* |
+|CamelMllpLocalAddress | The local TCP Address of the Socket | 
+|CamelMllpRemoteAddress | The remote TCP Address of the Socket | 
+|CamelMllpAcknowledgement | The HL7 Acknowledgment byte[] received | 
+|CamelMllpAcknowledgementString | The HL7 Acknowledgment received, converted to a String | 
+|CamelMllpAcknowledgementType | The HL7 acknowledgement type (AA, AE, AR, etc)
 |===================================
 
-All headers are String types. If a header value is missing, its value
-is null.
+### *Exchange Properties*
+
+The state of the TCP Socket can be controlled by these properties on the Camel exchange:
+
+[width="100%",cols="<34%,<33%,<33%",]
+|====================================
+|*Key* |*Type* |*Description*
+|CamelMllpCloseConnectionBeforeSend | Boolean | If true, the Socket will be closed before sending data
+|CamelMllpResetConnectionBeforeSend | Boolean | If true, the Socket will be reset before sending data
+|CamelMllpCloseConnectionAfterSend | Boolean | If true, the Socket will be closed immediately after sending data
+|CamelMllpResetConnectionAfterSend | Boolean | If true, the Socket will be reset immediately after sending any data
+|====================================
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
index 8a43682..acd5da5 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
@@ -21,39 +21,41 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
 import org.apache.camel.impl.UriEndpointComponent;
+import org.apache.camel.spi.Metadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Represents the component that manages {@link MllpEndpoint}.
  */
-public class MllpComponent extends UriEndpointComponent {
+public class MllpComponent extends DefaultComponent {
     public static final String MLLP_LOG_PHI_PROPERTY = "org.apache.camel.component.mllp.logPHI";
     public static final String MLLP_LOG_PHI_MAX_BYTES_PROPERTY = "org.apache.camel.component.mllp.logPHI.maxBytes";
     public static final boolean DEFAULT_LOG_PHI = true;
     public static final int DEFAULT_LOG_PHI_MAX_BYTES = 5120;
 
     static Logger log = LoggerFactory.getLogger(MllpComponent.class);
+
+    @Metadata(label = "advanced", defaultValue = "true")
     static Boolean logPhi;
+    @Metadata(label = "advanced", defaultValue = "5120")
     static Integer logPhiMaxBytes;
 
     MllpConfiguration configuration;
 
     public MllpComponent() {
-        super(MllpEndpoint.class);
     }
 
     public MllpComponent(CamelContext context) {
-        super(context, MllpEndpoint.class);
+        super(context);
     }
 
     @Override
     protected Endpoint createEndpoint(String uriString, String remaining, Map<String, Object> parameters) throws Exception {
         MllpEndpoint endpoint = new MllpEndpoint(uriString, this, hasConfiguration() ? configuration.copy() : new MllpConfiguration());
 
-        endpoint.setBridgeErrorHandler(true);
-
         setProperties(endpoint, parameters);
 
         // Make sure it has a host - may just be a port
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConfiguration.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConfiguration.java
index 4d5e46b..2554541 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConfiguration.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConfiguration.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.mllp;
 
 import java.util.Objects;
 
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.slf4j.Logger;
@@ -31,6 +33,17 @@ import org.slf4j.LoggerFactory;
 public class MllpConfiguration implements Cloneable {
     static final Logger LOG = LoggerFactory.getLogger(MllpConfiguration.class);
 
+    // URI Parameters overridden from DefaultEndpoint
+    @UriParam(label = "consumer", defaultValue = "true")
+    boolean bridgeErrorHandler = true;
+
+    @UriParam(label = "consumer,advanced", defaultValue = "InOut")
+    ExchangePattern exchangePattern = ExchangePattern.InOut;
+
+    @UriParam(label = "advanced", defaultValue = "true")
+    boolean synchronous = true;
+
+    // camel-mllp specific URI parameters
     @UriParam(label = "advanced,consumer,tcp", defaultValue = "5")
     Integer backlog = 5;
 
@@ -115,6 +128,10 @@ public class MllpConfiguration implements Cloneable {
         } else if (target == null) {
             LOG.warn("Values were not copied by MllpConfiguration.copy(MllpConfiguration source, MllpConfiguration target) - target argument is null");
         } else {
+            target.bridgeErrorHandler = source.bridgeErrorHandler;
+            target.exchangePattern = source.exchangePattern;
+            target.synchronous = source.synchronous;
+
             target.backlog = source.backlog;
             target.bindTimeout = source.bindTimeout;
             target.bindRetryInterval = source.bindRetryInterval;
@@ -150,6 +167,47 @@ public class MllpConfiguration implements Cloneable {
         MllpConfiguration.copy(source, this);
     }
 
+    public boolean isBridgeErrorHandler() {
+        return bridgeErrorHandler;
+    }
+
+    /**
+     * Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while
+     * the consumer is trying to receive incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler.
+     *
+     * If disabled, the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions by logging them at WARN or ERROR level and ignored.
+     *
+     * @param bridgeErrorHandler
+     */
+    public void setBridgeErrorHandler(boolean bridgeErrorHandler) {
+        this.bridgeErrorHandler = bridgeErrorHandler;
+    }
+
+    public ExchangePattern getExchangePattern() {
+        return exchangePattern;
+    }
+
+    /**
+     * Sets the exchange pattern when the consumer creates an exchange.
+     *
+     * @param exchangePattern
+     */
+    public void setExchangePattern(ExchangePattern exchangePattern) {
+        this.exchangePattern = exchangePattern;
+    }
+
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used (this component only supports synchronous operations).
+     *
+     * @param synchronous
+     */
+    public void setSynchronous(boolean synchronous) {
+    }
+
     public boolean hasCharsetName() {
         return charsetName != null && !charsetName.isEmpty();
     }
@@ -391,7 +449,7 @@ public class MllpConfiguration implements Cloneable {
      *
      * @param reuseAddress enable SO_REUSEADDR when true; disable SO_REUSEADDR when false; use system default when null
      */
-    public void setReuseAddress(boolean reuseAddress) {
+    public void setReuseAddress(Boolean reuseAddress) {
         this.reuseAddress = reuseAddress;
     }
 
@@ -468,7 +526,7 @@ public class MllpConfiguration implements Cloneable {
     }
 
     /**
-     * Enable disable strict compliance to the MLLP standard.
+     * Enable/Disable strict compliance to the MLLP standard.
      *
      * The MLLP standard specifies [START_OF_BLOCK]hl7 payload[END_OF_BLOCK][END_OF_DATA], however, some systems do not send
      * the final END_OF_DATA byte.  This setting controls whether or not the final END_OF_DATA byte is required or optional.
@@ -520,7 +578,7 @@ public class MllpConfiguration implements Cloneable {
     }
 
     /**
-     * Enable/Disable the validation of HL7 Payloads
+     * Enable/Disable the buffering of HL7 payloads before writing to the socket.
      *
      * @deprecated the parameter will be ignored
      *
@@ -532,7 +590,10 @@ public class MllpConfiguration implements Cloneable {
 
     @Override
     public int hashCode() {
-        return Objects.hash(backlog,
+        return Objects.hash(bridgeErrorHandler,
+            exchangePattern,
+            synchronous,
+            backlog,
             bindTimeout,
             bindRetryInterval,
             acceptTimeout,
@@ -568,7 +629,10 @@ public class MllpConfiguration implements Cloneable {
 
         MllpConfiguration rhs = (MllpConfiguration) o;
 
-        return bindTimeout == rhs.bindTimeout
+        return bridgeErrorHandler == rhs.bridgeErrorHandler
+            && exchangePattern == rhs.exchangePattern
+            && synchronous == rhs.synchronous
+            && bindTimeout == rhs.bindTimeout
             && bindRetryInterval == rhs.bindRetryInterval
             && acceptTimeout == rhs.acceptTimeout
             && connectTimeout == rhs.connectTimeout
@@ -595,7 +659,10 @@ public class MllpConfiguration implements Cloneable {
     @Override
     public String toString() {
         return "MllpConfiguration{"
-            + "backlog=" + backlog
+            + "bridgeErrorHandler=" + bridgeErrorHandler
+            + ", exchangePattern=" + exchangePattern
+            + ", synchronous=" + synchronous
+            + ", backlog=" + backlog
             + ", bindTimeout=" + bindTimeout
             + ", bindRetryInterval=" + bindRetryInterval
             + ", acceptTimeout=" + acceptTimeout
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index 6bffef5..18db7f2 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -90,6 +90,10 @@ public class MllpEndpoint extends DefaultEndpoint {
     public MllpEndpoint(String uri, MllpComponent component, MllpConfiguration configuration) {
         super(uri, component);
         this.configuration = configuration.copy();
+
+        super.setBridgeErrorHandler(configuration.isBridgeErrorHandler());
+        super.setExchangePattern(configuration.getExchangePattern());
+        super.setSynchronous(configuration.isSynchronous());
     }
 
     @Override
@@ -107,13 +111,21 @@ public class MllpEndpoint extends DefaultEndpoint {
     }
 
     @Override
-    public ExchangePattern getExchangePattern() {
-        return ExchangePattern.InOut;
+    public void setExchangePattern(ExchangePattern exchangePattern) {
+        configuration.setExchangePattern(exchangePattern);
+        super.setExchangePattern(configuration.getExchangePattern());
     }
 
     @Override
-    public boolean isSynchronous() {
-        return true;
+    public void setSynchronous(boolean synchronous) {
+        configuration.setSynchronous(synchronous);
+        super.setSynchronous(configuration.isSynchronous());
+    }
+
+    @Override
+    public void setBridgeErrorHandler(boolean bridgeErrorHandler) {
+        configuration.setBridgeErrorHandler(bridgeErrorHandler);
+        super.setBridgeErrorHandler(configuration.isBridgeErrorHandler());
     }
 
     private void setExchangeProperties(Exchange mllpExchange) {
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index b0cdcfa..cdb56a1 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -17,11 +17,16 @@
 
 package org.apache.camel.component.mllp;
 
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.net.BindException;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -31,16 +36,23 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.component.mllp.internal.Hl7Util;
 import org.apache.camel.component.mllp.internal.MllpSocketBuffer;
 import org.apache.camel.component.mllp.internal.TcpServerAcceptThread;
 import org.apache.camel.component.mllp.internal.TcpServerBindThread;
 import org.apache.camel.component.mllp.internal.TcpServerConsumerValidationRunnable;
 import org.apache.camel.component.mllp.internal.TcpSocketConsumerRunnable;
+import org.apache.camel.converter.IOConverter;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerationException;
+import org.apache.camel.util.IOHelper;
 
 /**
  * The MLLP consumer.
@@ -147,15 +159,6 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
         super.doStart();
     }
 
-    @Override
-    public void handleException(Throwable t) {
-        super.handleException(t);
-    }
-
-    @Override
-    public void handleException(String message, Throwable t) {
-        super.handleException(message, t);
-    }
 
     @Override
     protected void doShutdown() throws Exception {
@@ -167,14 +170,25 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
         validationExecutor.shutdownNow();
     }
 
+
+    public void handleMessageTimeout(String message, byte[] payload, Throwable cause) {
+        super.handleException(new MllpInvalidMessageException(message, payload, cause));
+    }
+
+    public void handleMessageException(String message, byte[] payload, Throwable cause) {
+        super.handleException(new MllpReceiveException(message, payload, cause));
+    }
+
     public MllpConfiguration getConfiguration() {
         return getEndpoint().getConfiguration();
     }
 
+
     public Map<TcpSocketConsumerRunnable, Long> getConsumerRunnables() {
         return consumerRunnables;
     }
 
+
     public void validateConsumer(Socket clientSocket) {
         MllpSocketBuffer mllpBuffer = new MllpSocketBuffer(getEndpoint());
         TcpServerConsumerValidationRunnable client = new TcpServerConsumerValidationRunnable(this, clientSocket, mllpBuffer);
@@ -206,4 +220,336 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
         }
     }
 
+    public void processMessage(byte[] hl7MessageBytes, TcpSocketConsumerRunnable consumerRunnable) {
+        getEndpoint().updateLastConnectionActivityTicks();
+
+        // Send the message on to Camel for processing and wait for the response
+        log.debug("Populating the exchange with received message");
+        Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut);
+        // TODO: Evaluate the CHARSET handling - may not be correct
+        exchange.setProperty(Exchange.CHARSET_NAME, getEndpoint().determineCharset(hl7MessageBytes, null));
+        try {
+            createUoW(exchange);
+            Message message = exchange.getIn();
+
+            if (consumerRunnable.hasLocalAddress()) {
+                message.setHeader(MllpConstants.MLLP_LOCAL_ADDRESS, consumerRunnable.getLocalAddress());
+            }
+
+            if (consumerRunnable.hasRemoteAddress()) {
+                message.setHeader(MllpConstants.MLLP_REMOTE_ADDRESS, consumerRunnable.getRemoteAddress());
+            }
+
+            if (message.hasHeaders() && message.getHeader(MllpConstants.MLLP_AUTO_ACKNOWLEDGE) == null) {
+                message.setHeader(MllpConstants.MLLP_AUTO_ACKNOWLEDGE, getConfiguration().isAutoAck());
+            }
+
+            if (getConfiguration().isValidatePayload()) {
+                String exceptionMessage = Hl7Util.generateInvalidPayloadExceptionMessage(hl7MessageBytes);
+                if (exceptionMessage != null) {
+                    exchange.setException(new MllpInvalidMessageException(exceptionMessage, hl7MessageBytes));
+                }
+            }
+            populateHl7DataHeaders(exchange, message, hl7MessageBytes);
+
+            if (getConfiguration().isStringPayload()) {
+                if (hl7MessageBytes != null && hl7MessageBytes.length > 0) {
+                    message.setBody(getEndpoint().createNewString(hl7MessageBytes, message.getHeader(MllpConstants.MLLP_CHARSET, String.class)), String.class);
+                } else {
+                    message.setBody("", String.class);
+                }
+            } else {
+                message.setBody(hl7MessageBytes, byte[].class);
+            }
+
+            log.debug("Calling processor");
+            try {
+                getProcessor().process(exchange);
+                sendAcknowledgement(hl7MessageBytes, exchange, consumerRunnable);
+            } catch (Exception unexpectedEx) {
+                getExceptionHandler().handleException("Unexpected exception processing exchange", exchange, unexpectedEx);
+            }
+        } catch (Exception uowEx) {
+            getExceptionHandler().handleException("Unexpected exception creating Unit of Work", exchange, uowEx);
+        } finally {
+            if (exchange != null) {
+                doneUoW(exchange);
+            }
+        }
+    }
+
+
+    void populateHl7DataHeaders(Exchange exchange, Message message, byte[] hl7MessageBytes) {
+        if (exchange != null && exchange.getException() == null) {
+            if (hl7MessageBytes == null || hl7MessageBytes.length < 8) {
+                // Not enough data to populate anything - just return
+                return;
+            }
+            // Find the end of the MSH and indexes of the fields in the MSH to populate message headers
+            final byte fieldSeparator = hl7MessageBytes[3];
+            int endOfMSH = -1;
+            List<Integer> fieldSeparatorIndexes = new ArrayList<>(10);  // We should have at least 10 fields
+
+            for (int i = 0; i < hl7MessageBytes.length; ++i) {
+                if (fieldSeparator == hl7MessageBytes[i]) {
+                    fieldSeparatorIndexes.add(i);
+                } else if (MllpProtocolConstants.SEGMENT_DELIMITER == hl7MessageBytes[i]) {
+                    // If the MSH Segment doesn't have a trailing field separator, add one so the field can be extracted into a header
+                    if (fieldSeparator != hl7MessageBytes[i - 1]) {
+                        fieldSeparatorIndexes.add(i);
+                    }
+                    endOfMSH = i;
+                    break;
+                }
+            }
+
+            if (-1 == endOfMSH) {
+                // TODO:  May want to throw some sort of an Exception here
+                log.error("Population of message headers failed - unable to find the end of the MSH segment");
+            } else if (getConfiguration().isHl7Headers()) {
+                log.debug("Populating the HL7 message headers");
+                Charset charset = Charset.forName(IOHelper.getCharsetName(exchange));
+
+                for (int i = 2; i < fieldSeparatorIndexes.size(); ++i) {
+                    int startingFieldSeparatorIndex = fieldSeparatorIndexes.get(i - 1);
+                    int endingFieldSeparatorIndex = fieldSeparatorIndexes.get(i);
+
+                    // Only populate the header if there's data in the HL7 field
+                    if (endingFieldSeparatorIndex - startingFieldSeparatorIndex > 1) {
+                        String headerName = null;
+                        switch (i) {
+                        case 2: // MSH-3
+                            headerName = MllpConstants.MLLP_SENDING_APPLICATION;
+                            break;
+                        case 3: // MSH-4
+                            headerName = MllpConstants.MLLP_SENDING_FACILITY;
+                            break;
+                        case 4: // MSH-5
+                            headerName = MllpConstants.MLLP_RECEIVING_APPLICATION;
+                            break;
+                        case 5: // MSH-6
+                            headerName = MllpConstants.MLLP_RECEIVING_FACILITY;
+                            break;
+                        case 6: // MSH-7
+                            headerName = MllpConstants.MLLP_TIMESTAMP;
+                            break;
+                        case 7: // MSH-8
+                            headerName = MllpConstants.MLLP_SECURITY;
+                            break;
+                        case 8: // MSH-9
+                            headerName = MllpConstants.MLLP_MESSAGE_TYPE;
+                            break;
+                        case 9: // MSH-10
+                            headerName = MllpConstants.MLLP_MESSAGE_CONTROL;
+                            break;
+                        case 10: // MSH-11
+                            headerName = MllpConstants.MLLP_PROCESSING_ID;
+                            break;
+                        case 11: // MSH-12
+                            headerName = MllpConstants.MLLP_VERSION_ID;
+                            break;
+                        case 17: // MSH-18
+                            headerName = MllpConstants.MLLP_CHARSET;
+                            break;
+                        default:
+                            // Not processing this field
+                            continue;
+                        }
+
+                        String headerValue = new String(hl7MessageBytes, startingFieldSeparatorIndex + 1,
+                            endingFieldSeparatorIndex - startingFieldSeparatorIndex - 1,
+                            charset);
+                        message.setHeader(headerName, headerValue);
+
+                        // For MSH-9, set a couple more headers
+                        if (i == 8) {
+                            // final byte componentSeparator = hl7MessageBytes[4];
+                            String componentSeparator = new String(hl7MessageBytes, 4, 1, charset);
+                            String[] components = headerValue.split(String.format("\\Q%s\\E", componentSeparator), 3);
+                            message.setHeader(MllpConstants.MLLP_EVENT_TYPE, components[0]);
+                            if (2 <= components.length) {
+                                message.setHeader(MllpConstants.MLLP_TRIGGER_EVENT, components[1]);
+                            }
+                        }
+                    }
+                }
+            } else {
+                log.trace("HL7 Message headers disabled");
+            }
+        }
+    }
+
+
+    void sendAcknowledgement(byte[] originalHl7MessageBytes, Exchange exchange, TcpSocketConsumerRunnable consumerRunnable) {
+        log.trace("entering sendAcknowledgement(byte[], Exchange)");
+
+        getEndpoint().checkBeforeSendProperties(exchange, consumerRunnable.getSocket(), log);
+
+        // Find the acknowledgement body
+        byte[] acknowledgementMessageBytes = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT, byte[].class);
+        if (acknowledgementMessageBytes == null) {
+            acknowledgementMessageBytes = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, byte[].class);
+        }
+
+        String acknowledgementMessageType = null;
+        if (null == acknowledgementMessageBytes) {
+
+            boolean autoAck = exchange.getProperty(MllpConstants.MLLP_AUTO_ACKNOWLEDGE, true, boolean.class);
+            if (!autoAck) {
+                Object acknowledgementBytesProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT);
+                Object acknowledgementStringProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING);
+                if (acknowledgementBytesProperty == null && acknowledgementStringProperty == null) {
+                    final String exceptionMessage = "Automatic Acknowledgement is disabled and the "
+                        + MllpConstants.MLLP_ACKNOWLEDGEMENT + " and " + MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING + " exchange properties are null";
+                    exchange.setException(new MllpInvalidAcknowledgementException(exceptionMessage, originalHl7MessageBytes, acknowledgementMessageBytes));
+                } else {
+                    final String exceptionMessage = "Automatic Acknowledgement is disabled and neither the "
+                        + MllpConstants.MLLP_ACKNOWLEDGEMENT + "(type = " + acknowledgementBytesProperty.getClass().getSimpleName() + ") nor  the"
+                        + MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING + "(type = " + acknowledgementBytesProperty.getClass().getSimpleName() + ") exchange properties can be converted to byte[]";
+                    exchange.setException(new MllpInvalidAcknowledgementException(exceptionMessage, originalHl7MessageBytes, acknowledgementMessageBytes));
+                }
+            } else {
+                String acknowledgmentTypeProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, String.class);
+                try {
+                    if (null == acknowledgmentTypeProperty) {
+                        if (null == exchange.getException()) {
+                            acknowledgementMessageType = "AA";
+                        } else {
+                            acknowledgementMessageType = "AE";
+                        }
+                    } else {
+                        switch (acknowledgmentTypeProperty) {
+                        case "AA":
+                            acknowledgementMessageType = "AA";
+                            break;
+                        case "AE":
+                            acknowledgementMessageType = "AE";
+                            break;
+                        case "AR":
+                            acknowledgementMessageType = "AR";
+                            break;
+                        default:
+                            exchange.setException(new Hl7AcknowledgementGenerationException("Unsupported acknowledgment type: " + acknowledgmentTypeProperty));
+                            return;
+                        }
+                    }
+
+                    Hl7Util.generateAcknowledgementPayload(consumerRunnable.getMllpBuffer(), originalHl7MessageBytes, acknowledgementMessageType);
+
+                } catch (Hl7AcknowledgementGenerationException ackGenerationException) {
+                    exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, ackGenerationException);
+                    exchange.setException(ackGenerationException);
+                }
+            }
+        } else {
+            consumerRunnable.getMllpBuffer().setEnvelopedMessage(acknowledgementMessageBytes);
+
+            final byte bM = 77;
+            final byte bS = 83;
+            final byte bA = 65;
+            final byte bE = 69;
+            final byte bR = 82;
+
+            final byte fieldSeparator = originalHl7MessageBytes[3];
+            // Acknowledgment is specified in exchange property - determine the acknowledgement type
+            for (int i = 0; i < originalHl7MessageBytes.length; ++i) {
+                if (MllpProtocolConstants.SEGMENT_DELIMITER == i) {
+                    if (i + 7 < originalHl7MessageBytes.length // Make sure we don't run off the end of the message
+                        && bM == originalHl7MessageBytes[i + 1] && bS == originalHl7MessageBytes[i + 2]
+                        && bA == originalHl7MessageBytes[i + 3] && fieldSeparator == originalHl7MessageBytes[i + 4]) {
+                        if (fieldSeparator != originalHl7MessageBytes[i + 7]) {
+                            log.warn("MSA-1 is longer than 2-bytes - ignoring trailing bytes");
+                        }
+                        // Found MSA - pull acknowledgement bytes
+                        byte[] acknowledgmentTypeBytes = new byte[2];
+                        acknowledgmentTypeBytes[0] = originalHl7MessageBytes[i + 5];
+                        acknowledgmentTypeBytes[1] = originalHl7MessageBytes[i + 6];
+                        try {
+                            acknowledgementMessageType = IOConverter.toString(acknowledgmentTypeBytes, exchange);
+                        } catch (IOException ioEx) {
+                            throw new RuntimeException("Failed to convert acknowledgement message to string", ioEx);
+                        }
+
+                        // Verify it's a valid acknowledgement code
+                        if (bA != acknowledgmentTypeBytes[0]) {
+                            switch (acknowledgementMessageBytes[1]) {
+                            case bA:
+                            case bR:
+                            case bE:
+                                break;
+                            default:
+                                log.warn("Invalid acknowledgement type [" + acknowledgementMessageType + "] found in message - should be AA, AE or AR");
+                            }
+                        }
+
+                        // if the MLLP_ACKNOWLEDGEMENT_TYPE property is set on the exchange, make sure it matches
+                        String acknowledgementTypeProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, String.class);
+                        if (null != acknowledgementTypeProperty && !acknowledgementTypeProperty.equals(acknowledgementMessageType)) {
+                            log.warn("Acknowledgement type found in message [" + acknowledgementMessageType + "] does not match "
+                                + MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE + " exchange property value [" + acknowledgementTypeProperty + "] - using value found in message");
+                        }
+                    }
+                }
+            }
+        }
+
+        Message message = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+        if (acknowledgementMessageType != null && !acknowledgementMessageType.isEmpty()) {
+            message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, acknowledgementMessageType);
+        }
+
+        if (consumerRunnable.getMllpBuffer().hasCompleteEnvelope()) {
+            // The mllpBuffer will be used if bufferWrites is set or if auto acknowledgement is used
+            message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, consumerRunnable.getMllpBuffer().toMllpPayload());
+            message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, consumerRunnable.getMllpBuffer().toHl7String(IOHelper.getCharsetName(exchange, false)));
+
+            // Send the acknowledgement
+            if (log.isDebugEnabled()) {
+                log.debug("Sending Acknowledgement: {}", consumerRunnable.getMllpBuffer().toPrintFriendlyHl7String());
+            }
+
+            try {
+                consumerRunnable.getMllpBuffer().writeTo(consumerRunnable.getSocket());
+            } catch (MllpSocketException acknowledgementDeliveryEx) {
+                Exception exchangeEx = new MllpAcknowledgementDeliveryException("Failure delivering acknowledgment", originalHl7MessageBytes, acknowledgementMessageBytes, acknowledgementDeliveryEx);
+                exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, acknowledgementDeliveryEx);
+                exchange.setException(exchangeEx);
+            } finally {
+                consumerRunnable.getMllpBuffer().reset();
+            }
+        } else if (acknowledgementMessageBytes != null && acknowledgementMessageBytes.length > 0) {
+            message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, acknowledgementMessageBytes);
+            String acknowledgementMessageString = "";
+            String exchangeCharset = IOHelper.getCharsetName(exchange, false);
+            if (exchangeCharset != null && !exchangeCharset.isEmpty()) {
+                try {
+                    acknowledgementMessageString = new String(acknowledgementMessageBytes, exchangeCharset);
+                } catch (UnsupportedEncodingException e) {
+                    log.warn("Failed to covert acknowledgment to string using {} charset - falling back to default charset {}", exchange, MllpProtocolConstants.DEFAULT_CHARSET);
+                    acknowledgementMessageString = new String(acknowledgementMessageBytes, MllpProtocolConstants.DEFAULT_CHARSET);
+                }
+            } else {
+                acknowledgementMessageString = new String(acknowledgementMessageBytes, MllpProtocolConstants.DEFAULT_CHARSET);
+            }
+            message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, acknowledgementMessageString);
+
+            // Send the acknowledgement
+            if (log.isDebugEnabled()) {
+                log.debug("Sending Acknowledgement: {}", Hl7Util.convertToPrintFriendlyString(acknowledgementMessageBytes));
+            }
+
+            try {
+                consumerRunnable.getMllpBuffer().setEnvelopedMessage(acknowledgementMessageBytes);
+                consumerRunnable.getMllpBuffer().writeTo(consumerRunnable.getSocket());
+            } catch (MllpSocketException acknowledgementDeliveryEx) {
+                Exception exchangeEx = new MllpAcknowledgementDeliveryException("Failure delivering acknowledgment", originalHl7MessageBytes, acknowledgementMessageBytes, acknowledgementDeliveryEx);
+                exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, acknowledgementDeliveryEx);
+                exchange.setException(exchangeEx);
+            }
+        }
+
+        getEndpoint().checkAfterSendProperties(exchange, consumerRunnable.getSocket(), log);
+    }
+
+
 }
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerBindThread.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerBindThread.java
index 893cf9e..740c1b9 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerBindThread.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerBindThread.java
@@ -53,11 +53,10 @@ public class TcpServerBindThread extends Thread {
 
 
     /**
-     * Do the initial read on the Socket and try to determine if it has HL7 data, junk, or nothing.
+     * Bind the TCP ServerSocket within the specified timeout.
      */
     @Override
     public void run() {
-
         MDC.put(MDCUnitOfWork.MDC_CAMEL_CONTEXT_ID, consumer.getEndpoint().getCamelContext().getName());
 
         Route route = consumer.getRoute();
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java
index 8940867..3e1c44d 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java
@@ -133,343 +133,7 @@ public class TcpSocketConsumerRunnable implements Runnable {
         return String.format("%s[%s] - %s", this.getClass().getSimpleName(), endpointKey, combinedAddress);
     }
 
-    private void sendAcknowledgement(byte[] originalHl7MessageBytes, Exchange exchange) {
-        log.trace("entering sendAcknowledgement(byte[], Exchange)");
 
-        consumer.getEndpoint().checkBeforeSendProperties(exchange, clientSocket, log);
-
-        // Find the acknowledgement body
-        byte[] acknowledgementMessageBytes = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT, byte[].class);
-        if (acknowledgementMessageBytes == null) {
-            acknowledgementMessageBytes = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, byte[].class);
-        }
-
-        String acknowledgementMessageType = null;
-        if (null == acknowledgementMessageBytes) {
-
-            boolean autoAck = exchange.getProperty(MllpConstants.MLLP_AUTO_ACKNOWLEDGE, true, boolean.class);
-            if (!autoAck) {
-                Object acknowledgementBytesProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT);
-                Object acknowledgementStringProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING);
-                if (acknowledgementBytesProperty == null && acknowledgementStringProperty == null) {
-                    final String exceptionMessage = "Automatic Acknowledgement is disabled and the "
-                        + MllpConstants.MLLP_ACKNOWLEDGEMENT + " and " + MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING + " exchange properties are null";
-                    exchange.setException(new MllpInvalidAcknowledgementException(exceptionMessage, originalHl7MessageBytes, acknowledgementMessageBytes));
-                } else {
-                    final String exceptionMessage = "Automatic Acknowledgement is disabled and neither the "
-                        + MllpConstants.MLLP_ACKNOWLEDGEMENT + "(type = " + acknowledgementBytesProperty.getClass().getSimpleName() + ") nor  the"
-                        + MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING + "(type = " + acknowledgementBytesProperty.getClass().getSimpleName() + ") exchange properties can be converted to byte[]";
-                    exchange.setException(new MllpInvalidAcknowledgementException(exceptionMessage, originalHl7MessageBytes, acknowledgementMessageBytes));
-                }
-            } else {
-                String acknowledgmentTypeProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, String.class);
-                try {
-                    if (null == acknowledgmentTypeProperty) {
-                        if (null == exchange.getException()) {
-                            acknowledgementMessageType = "AA";
-                        } else {
-                            acknowledgementMessageType = "AE";
-                        }
-                    } else {
-                        switch (acknowledgmentTypeProperty) {
-                        case "AA":
-                            acknowledgementMessageType = "AA";
-                            break;
-                        case "AE":
-                            acknowledgementMessageType = "AE";
-                            break;
-                        case "AR":
-                            acknowledgementMessageType = "AR";
-                            break;
-                        default:
-                            exchange.setException(new Hl7AcknowledgementGenerationException("Unsupported acknowledgment type: " + acknowledgmentTypeProperty));
-                            return;
-                        }
-                    }
-
-                    Hl7Util.generateAcknowledgementPayload(mllpBuffer, originalHl7MessageBytes, acknowledgementMessageType);
-
-                } catch (Hl7AcknowledgementGenerationException ackGenerationException) {
-                    exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, ackGenerationException);
-                    exchange.setException(ackGenerationException);
-                }
-            }
-        } else {
-            mllpBuffer.setEnvelopedMessage(acknowledgementMessageBytes);
-
-            final byte bM = 77;
-            final byte bS = 83;
-            final byte bA = 65;
-            final byte bE = 69;
-            final byte bR = 82;
-
-            final byte fieldSeparator = originalHl7MessageBytes[3];
-            // Acknowledgment is specified in exchange property - determine the acknowledgement type
-            for (int i = 0; i < originalHl7MessageBytes.length; ++i) {
-                if (MllpProtocolConstants.SEGMENT_DELIMITER == i) {
-                    if (i + 7 < originalHl7MessageBytes.length // Make sure we don't run off the end of the message
-                        && bM == originalHl7MessageBytes[i + 1] && bS == originalHl7MessageBytes[i + 2]
-                        && bA == originalHl7MessageBytes[i + 3] && fieldSeparator == originalHl7MessageBytes[i + 4]) {
-                        if (fieldSeparator != originalHl7MessageBytes[i + 7]) {
-                            log.warn("MSA-1 is longer than 2-bytes - ignoring trailing bytes");
-                        }
-                        // Found MSA - pull acknowledgement bytes
-                        byte[] acknowledgmentTypeBytes = new byte[2];
-                        acknowledgmentTypeBytes[0] = originalHl7MessageBytes[i + 5];
-                        acknowledgmentTypeBytes[1] = originalHl7MessageBytes[i + 6];
-                        try {
-                            acknowledgementMessageType = IOConverter.toString(acknowledgmentTypeBytes, exchange);
-                        } catch (IOException ioEx) {
-                            throw new RuntimeException("Failed to convert acknowledgement message to string", ioEx);
-                        }
-
-                        // Verify it's a valid acknowledgement code
-                        if (bA != acknowledgmentTypeBytes[0]) {
-                            switch (acknowledgementMessageBytes[1]) {
-                            case bA:
-                            case bR:
-                            case bE:
-                                break;
-                            default:
-                                log.warn("Invalid acknowledgement type [" + acknowledgementMessageType + "] found in message - should be AA, AE or AR");
-                            }
-                        }
-
-                        // if the MLLP_ACKNOWLEDGEMENT_TYPE property is set on the exchange, make sure it matches
-                        String acknowledgementTypeProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, String.class);
-                        if (null != acknowledgementTypeProperty && !acknowledgementTypeProperty.equals(acknowledgementMessageType)) {
-                            log.warn("Acknowledgement type found in message [" + acknowledgementMessageType + "] does not match "
-                                + MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE + " exchange property value [" + acknowledgementTypeProperty + "] - using value found in message");
-                        }
-                    }
-                }
-            }
-        }
-
-        Message message = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
-        if (acknowledgementMessageType != null && !acknowledgementMessageType.isEmpty()) {
-            message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, acknowledgementMessageType);
-        }
-
-        if (mllpBuffer.hasCompleteEnvelope()) {
-            // The mllpBuffer will be used if bufferWrites is set or if auto acknowledgement is used
-            message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, mllpBuffer.toMllpPayload());
-            message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, mllpBuffer.toHl7String(IOHelper.getCharsetName(exchange, false)));
-
-            // Send the acknowledgement
-            if (log.isDebugEnabled()) {
-                log.debug("Sending Acknowledgement: {}", mllpBuffer.toPrintFriendlyHl7String());
-            }
-
-            try {
-                mllpBuffer.writeTo(clientSocket);
-            } catch (MllpSocketException acknowledgementDeliveryEx) {
-                Exception exchangeEx = new MllpAcknowledgementDeliveryException("Failure delivering acknowledgment", originalHl7MessageBytes, acknowledgementMessageBytes, acknowledgementDeliveryEx);
-                exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, acknowledgementDeliveryEx);
-                exchange.setException(exchangeEx);
-            } finally {
-                mllpBuffer.reset();
-            }
-        } else if (acknowledgementMessageBytes != null && acknowledgementMessageBytes.length > 0) {
-            message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, acknowledgementMessageBytes);
-            String acknowledgementMessageString = "";
-            String exchangeCharset = IOHelper.getCharsetName(exchange, false);
-            if (exchangeCharset != null && !exchangeCharset.isEmpty()) {
-                try {
-                    acknowledgementMessageString = new String(acknowledgementMessageBytes, exchangeCharset);
-                } catch (UnsupportedEncodingException e) {
-                    log.warn("Failed to covert acknowledgment to string using {} charset - falling back to default charset {}", exchange, MllpProtocolConstants.DEFAULT_CHARSET);
-                    acknowledgementMessageString = new String(acknowledgementMessageBytes, MllpProtocolConstants.DEFAULT_CHARSET);
-                }
-            } else {
-                acknowledgementMessageString = new String(acknowledgementMessageBytes, MllpProtocolConstants.DEFAULT_CHARSET);
-            }
-            message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, acknowledgementMessageString);
-
-            // Send the acknowledgement
-            if (log.isDebugEnabled()) {
-                log.debug("Sending Acknowledgement: {}", Hl7Util.convertToPrintFriendlyString(acknowledgementMessageBytes));
-            }
-
-            try {
-                mllpBuffer.setEnvelopedMessage(acknowledgementMessageBytes);
-                mllpBuffer.writeTo(clientSocket);
-            } catch (MllpSocketException acknowledgementDeliveryEx) {
-                Exception exchangeEx = new MllpAcknowledgementDeliveryException("Failure delivering acknowledgment", originalHl7MessageBytes, acknowledgementMessageBytes, acknowledgementDeliveryEx);
-                exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, acknowledgementDeliveryEx);
-                exchange.setException(exchangeEx);
-            }
-        }
-
-        consumer.getEndpoint().checkAfterSendProperties(exchange, clientSocket, log);
-    }
-
-    private void populateHl7DataHeaders(Exchange exchange, Message message, byte[] hl7MessageBytes) {
-        if (exchange != null && exchange.getException() == null) {
-            if (hl7MessageBytes == null || hl7MessageBytes.length < 8) {
-                // Not enough data to populate anything - just return
-                return;
-            }
-            // Find the end of the MSH and indexes of the fields in the MSH to populate message headers
-            final byte fieldSeparator = hl7MessageBytes[3];
-            int endOfMSH = -1;
-            List<Integer> fieldSeparatorIndexes = new ArrayList<>(10);  // We should have at least 10 fields
-
-            for (int i = 0; i < hl7MessageBytes.length; ++i) {
-                if (fieldSeparator == hl7MessageBytes[i]) {
-                    fieldSeparatorIndexes.add(i);
-                } else if (MllpProtocolConstants.SEGMENT_DELIMITER == hl7MessageBytes[i]) {
-                    // If the MSH Segment doesn't have a trailing field separator, add one so the field can be extracted into a header
-                    if (fieldSeparator != hl7MessageBytes[i - 1]) {
-                        fieldSeparatorIndexes.add(i);
-                    }
-                    endOfMSH = i;
-                    break;
-                }
-            }
-
-            if (-1 == endOfMSH) {
-                // TODO:  May want to throw some sort of an Exception here
-                log.error("Population of message headers failed - unable to find the end of the MSH segment");
-            } else if (consumer.getConfiguration().isHl7Headers()) {
-                log.debug("Populating the HL7 message headers");
-                Charset charset = Charset.forName(IOHelper.getCharsetName(exchange));
-
-                for (int i = 2; i < fieldSeparatorIndexes.size(); ++i) {
-                    int startingFieldSeparatorIndex = fieldSeparatorIndexes.get(i - 1);
-                    int endingFieldSeparatorIndex = fieldSeparatorIndexes.get(i);
-
-                    // Only populate the header if there's data in the HL7 field
-                    if (endingFieldSeparatorIndex - startingFieldSeparatorIndex > 1) {
-                        String headerName = null;
-                        switch (i) {
-                        case 2: // MSH-3
-                            headerName = MllpConstants.MLLP_SENDING_APPLICATION;
-                            break;
-                        case 3: // MSH-4
-                            headerName = MllpConstants.MLLP_SENDING_FACILITY;
-                            break;
-                        case 4: // MSH-5
-                            headerName = MllpConstants.MLLP_RECEIVING_APPLICATION;
-                            break;
-                        case 5: // MSH-6
-                            headerName = MllpConstants.MLLP_RECEIVING_FACILITY;
-                            break;
-                        case 6: // MSH-7
-                            headerName = MllpConstants.MLLP_TIMESTAMP;
-                            break;
-                        case 7: // MSH-8
-                            headerName = MllpConstants.MLLP_SECURITY;
-                            break;
-                        case 8: // MSH-9
-                            headerName = MllpConstants.MLLP_MESSAGE_TYPE;
-                            break;
-                        case 9: // MSH-10
-                            headerName = MllpConstants.MLLP_MESSAGE_CONTROL;
-                            break;
-                        case 10: // MSH-11
-                            headerName = MllpConstants.MLLP_PROCESSING_ID;
-                            break;
-                        case 11: // MSH-12
-                            headerName = MllpConstants.MLLP_VERSION_ID;
-                            break;
-                        case 17: // MSH-18
-                            headerName = MllpConstants.MLLP_CHARSET;
-                            break;
-                        default:
-                            // Not processing this field
-                            continue;
-                        }
-
-                        String headerValue = new String(hl7MessageBytes, startingFieldSeparatorIndex + 1,
-                            endingFieldSeparatorIndex - startingFieldSeparatorIndex - 1,
-                            charset);
-                        message.setHeader(headerName, headerValue);
-
-                        // For MSH-9, set a couple more headers
-                        if (i == 8) {
-                            // final byte componentSeparator = hl7MessageBytes[4];
-                            String componentSeparator = new String(hl7MessageBytes, 4, 1, charset);
-                            String[] components = headerValue.split(String.format("\\Q%s\\E", componentSeparator), 3);
-                            message.setHeader(MllpConstants.MLLP_EVENT_TYPE, components[0]);
-                            if (2 <= components.length) {
-                                message.setHeader(MllpConstants.MLLP_TRIGGER_EVENT, components[1]);
-                            }
-                        }
-                    }
-                }
-            } else {
-                log.trace("HL7 Message headers disabled");
-            }
-        }
-    }
-
-
-    void processMessage(byte[] hl7MessageBytes) {
-        consumer.getEndpoint().updateLastConnectionActivityTicks();
-
-        // Send the message on to Camel for processing and wait for the response
-        log.debug("Populating the exchange with received message");
-        Exchange exchange = consumer.getEndpoint().createExchange(ExchangePattern.InOut);
-        // TODO: Evaluate the CHARSET handling - may not be correct
-        exchange.setProperty(Exchange.CHARSET_NAME, consumer.getEndpoint().determineCharset(hl7MessageBytes, null));
-        try {
-            consumer.createUoW(exchange);
-            Message message = exchange.getIn();
-
-            if (localAddress != null) {
-                message.setHeader(MllpConstants.MLLP_LOCAL_ADDRESS, localAddress);
-            }
-
-            if (remoteAddress != null) {
-                message.setHeader(MllpConstants.MLLP_REMOTE_ADDRESS, remoteAddress);
-            }
-            message.setHeader(MllpConstants.MLLP_AUTO_ACKNOWLEDGE, consumer.getConfiguration().isAutoAck());
-
-            if (consumer.getConfiguration().isValidatePayload()) {
-                String exceptionMessage = Hl7Util.generateInvalidPayloadExceptionMessage(hl7MessageBytes);
-                if (exceptionMessage != null) {
-                    exchange.setException(new MllpInvalidMessageException(exceptionMessage, hl7MessageBytes));
-                }
-            }
-            populateHl7DataHeaders(exchange, message, hl7MessageBytes);
-
-            if (consumer.getConfiguration().isStringPayload()) {
-                if (hl7MessageBytes != null && hl7MessageBytes.length > 0) {
-                    message.setBody(consumer.getEndpoint().createNewString(hl7MessageBytes, message.getHeader(MllpConstants.MLLP_CHARSET, String.class)), String.class);
-                } else {
-                    message.setBody("", String.class);
-                }
-            } else {
-                message.setBody(hl7MessageBytes, byte[].class);
-            }
-
-            log.debug("Calling processor");
-            try {
-                consumer.getProcessor().process(exchange);
-                sendAcknowledgement(hl7MessageBytes, exchange);
-            } catch (RuntimeException runtimeEx) {
-                throw runtimeEx;
-            } catch (Exception ex) {
-                log.error("Unexpected exception processing exchange", ex);
-                exchange.setException(ex);
-            }
-        } catch (Exception uowEx) {
-            // TODO:  Handle this correctly
-            exchange.setException(uowEx);
-            log.warn("Exception encountered creating Unit of Work - sending exception to route", uowEx);
-            try {
-                consumer.getProcessor().process(exchange);
-            } catch (Exception e) {
-                log.error("Exception encountered processing exchange with exception encountered createing Unit of Work", e);
-            }
-            return;
-        } finally {
-            if (exchange != null) {
-                consumer.doneUoW(exchange);
-            }
-        }
-    }
 
     @Override
     public void run() {
@@ -493,7 +157,7 @@ public class TcpSocketConsumerRunnable implements Runnable {
                 // If we got a complete message on the validation read, process it
                 hl7MessageBytes = mllpBuffer.toMllpPayload();
                 mllpBuffer.reset();
-                processMessage(hl7MessageBytes);
+                consumer.processMessage(hl7MessageBytes, this);
             }
 
             while (running && null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) {
@@ -512,7 +176,7 @@ public class TcpSocketConsumerRunnable implements Runnable {
                         }
                         mllpBuffer.reset();
 
-                        processMessage(hl7MessageBytes);
+                        consumer.processMessage(hl7MessageBytes, this);
                     } else if (!mllpBuffer.hasStartOfBlock()) {
                         byte[] payload = mllpBuffer.toByteArray();
                         log.warn("Ignoring {} byte un-enveloped payload {}", payload.length, Hl7Util.convertToPrintFriendlyString(payload));
@@ -535,12 +199,12 @@ public class TcpSocketConsumerRunnable implements Runnable {
                     } else {
                         mllpBuffer.resetSocket(clientSocket);
                         new MllpInvalidMessageException("Timeout receiving complete message payload", mllpBuffer.toByteArrayAndReset(), timeoutEx);
-                        consumer.handleException(new MllpInvalidMessageException("Timeout receiving complete message payload", mllpBuffer.toByteArrayAndReset(), timeoutEx));
+                        consumer.handleMessageTimeout("Timeout receiving complete message payload", mllpBuffer.toByteArrayAndReset(), timeoutEx);
                     }
                 } catch (MllpSocketException mllpSocketEx) {
                     mllpBuffer.resetSocket(clientSocket);
                     if (!mllpBuffer.isEmpty()) {
-                        consumer.handleException(new MllpReceiveException("Exception encountered reading payload", mllpBuffer.toByteArrayAndReset(), mllpSocketEx));
+                        consumer.handleMessageException("Exception encountered reading payload", mllpBuffer.toByteArrayAndReset(), mllpSocketEx);
                     } else {
                         log.warn("Ignoring exception encountered checking for data", mllpSocketEx);
                     }
@@ -560,6 +224,14 @@ public class TcpSocketConsumerRunnable implements Runnable {
         }
     }
 
+    public Socket getSocket() {
+        return clientSocket;
+    }
+
+    public MllpSocketBuffer getMllpBuffer() {
+        return mllpBuffer;
+    }
+
     public void closeSocket() {
         mllpBuffer.closeSocket(clientSocket);
     }
@@ -580,14 +252,26 @@ public class TcpSocketConsumerRunnable implements Runnable {
         running = false;
     }
 
+    public boolean hasLocalAddress() {
+        return localAddress != null && !localAddress.isEmpty();
+    }
+
     public String getLocalAddress() {
         return localAddress;
     }
 
+    public boolean hasRemoteAddress() {
+        return remoteAddress != null && !remoteAddress.isEmpty();
+    }
+
     public String getRemoteAddress() {
         return remoteAddress;
     }
 
+    public boolean hasCombinedAddress() {
+        return combinedAddress != null && combinedAddress.isEmpty();
+    }
+
     public String getCombinedAddress() {
         return combinedAddress;
     }
diff --git a/platforms/spring-boot/components-starter/camel-mllp-starter/src/main/java/org/apache/camel/component/mllp/springboot/MllpComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-mllp-starter/src/main/java/org/apache/camel/component/mllp/springboot/MllpComponentConfiguration.java
index 0ff738a..a339584 100644
--- a/platforms/spring-boot/components-starter/camel-mllp-starter/src/main/java/org/apache/camel/component/mllp/springboot/MllpComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-mllp-starter/src/main/java/org/apache/camel/component/mllp/springboot/MllpComponentConfiguration.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.mllp.springboot;
 
 import javax.annotation.Generated;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.component.mllp.MllpComponent;
 import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
@@ -36,12 +38,12 @@ public class MllpComponentConfiguration
     /**
      * Set the component to log PHI data.
      */
-    private Boolean logPhi;
+    private Boolean logPhi = true;
     /**
      * Set the maximum number of bytes of PHI that will be logged in a log
      * entry.
      */
-    private Integer logPhiMaxBytes;
+    private Integer logPhiMaxBytes = 5120;
     /**
      * Sets the default configuration to use when creating MLLP endpoints.
      */
@@ -90,6 +92,30 @@ public class MllpComponentConfiguration
     public static class MllpConfigurationNestedConfiguration {
         public static final Class CAMEL_NESTED_CLASS = org.apache.camel.component.mllp.MllpConfiguration.class;
         /**
+         * Allows for bridging the consumer to the Camel routing Error Handler,
+         * which mean any exceptions occurred while the consumer is trying to
+         * receive incoming messages, or the likes, will now be processed as a
+         * message and handled by the routing Error Handler. If disabled, the
+         * consumer will use the org.apache.camel.spi.ExceptionHandler to deal
+         * with exceptions by logging them at WARN or ERROR level and ignored.
+         * 
+         * @param bridgeErrorHandler
+         */
+        private Boolean bridgeErrorHandler = true;
+        /**
+         * Sets the exchange pattern when the consumer creates an exchange.
+         * 
+         * @param exchangePattern
+         */
+        private ExchangePattern exchangePattern = ExchangePattern.InOut;
+        /**
+         * Sets whether synchronous processing should be strictly used (this
+         * component only supports synchronous operations).
+         * 
+         * @param synchronous
+         */
+        private Boolean synchronous = true;
+        /**
          * Set the CamelCharsetName property on the exchange
          * 
          * @param charsetName
@@ -207,6 +233,14 @@ public class MllpComponentConfiguration
          */
         private Boolean tcpNoDelay = true;
         /**
+         * Enable/disable the SO_REUSEADDR socket option.
+         * 
+         * @param reuseAddress
+         *            enable SO_REUSEADDR when true; disable SO_REUSEADDR when
+         *            false; use system default when null
+         */
+        private Boolean reuseAddress = false;
+        /**
          * Sets the SO_RCVBUF option to the specified value (in bytes)
          * 
          * @param receiveBufferSize
@@ -239,7 +273,7 @@ public class MllpComponentConfiguration
          */
         private Boolean hl7Headers = true;
         /**
-         * Enable disable strict compliance to the MLLP standard. The MLLP
+         * Enable/Disable strict compliance to the MLLP standard. The MLLP
          * standard specifies [START_OF_BLOCK]hl7
          * payload[END_OF_BLOCK][END_OF_DATA], however, some systems do not send
          * the final END_OF_DATA byte. This setting controls whether or not the
@@ -276,7 +310,8 @@ public class MllpComponentConfiguration
          */
         private Boolean validatePayload = false;
         /**
-         * Enable/Disable the validation of HL7 Payloads
+         * Enable/Disable the buffering of HL7 payloads before writing to the
+         * socket.
          * 
          * @deprecated the parameter will be ignored
          * @param bufferWrites
@@ -285,6 +320,30 @@ public class MllpComponentConfiguration
         @Deprecated
         private Boolean bufferWrites = false;
 
+        public Boolean getBridgeErrorHandler() {
+            return bridgeErrorHandler;
+        }
+
+        public void setBridgeErrorHandler(Boolean bridgeErrorHandler) {
+            this.bridgeErrorHandler = bridgeErrorHandler;
+        }
+
+        public ExchangePattern getExchangePattern() {
+            return exchangePattern;
+        }
+
+        public void setExchangePattern(ExchangePattern exchangePattern) {
+            this.exchangePattern = exchangePattern;
+        }
+
+        public Boolean getSynchronous() {
+            return synchronous;
+        }
+
+        public void setSynchronous(Boolean synchronous) {
+            this.synchronous = synchronous;
+        }
+
         public String getCharsetName() {
             return charsetName;
         }
@@ -400,6 +459,14 @@ public class MllpComponentConfiguration
             this.tcpNoDelay = tcpNoDelay;
         }
 
+        public Boolean getReuseAddress() {
+            return reuseAddress;
+        }
+
+        public void setReuseAddress(Boolean reuseAddress) {
+            this.reuseAddress = reuseAddress;
+        }
+
         public Integer getReceiveBufferSize() {
             return receiveBufferSize;
         }

-- 
To stop receiving notification emails like this one, please contact
quinn@apache.org.