You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/12/16 21:59:15 UTC

[6/6] camel git commit: CAMEL-10511: Updated MllpTcpClientProducer and MllpTcpServerConsumer to consume all available data on socket - backport to 2.17

CAMEL-10511:  Updated MllpTcpClientProducer and MllpTcpServerConsumer to consume all available data on socket - backport to 2.17


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a53540da
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a53540da
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a53540da

Branch: refs/heads/camel-2.17.x
Commit: a53540da1e5b7550355c68dc9e98c5e6e77c109a
Parents: 55c621f
Author: Quinn Stevenson <qu...@pronoia-solutions.com>
Authored: Fri Dec 16 13:41:12 2016 -0700
Committer: Quinn Stevenson <qu...@pronoia-solutions.com>
Committed: Fri Dec 16 13:41:12 2016 -0700

----------------------------------------------------------------------
 components/camel-mllp/pom.xml                   |  16 +
 .../MllpAcknowledgementDeliveryException.java   |  40 ++
 .../mllp/MllpAcknowledgementException.java      |  17 +-
 .../MllpAcknowledgementTimeoutException.java    |  69 +++
 .../MllpAcknowledgementTimoutException.java     |  38 --
 ...pplicationErrorAcknowledgementException.java |  19 +-
 ...plicationRejectAcknowledgementException.java |  19 +-
 ...MllpCommitErrorAcknowledgementException.java |  19 +-
 ...llpCommitRejectAcknowledgementException.java |  19 +-
 .../camel/component/mllp/MllpComponent.java     |  40 +-
 .../camel/component/mllp/MllpConstants.java     |   3 +
 .../mllp/MllpCorruptFrameException.java         |  40 --
 .../camel/component/mllp/MllpEndpoint.java      | 122 ++++-
 .../camel/component/mllp/MllpException.java     |  74 ++-
 .../component/mllp/MllpFrameException.java      |  41 ++
 .../MllpInvalidAcknowledgementException.java    |  16 +-
 .../mllp/MllpInvalidMessageException.java       |  30 ++
 .../MllpNegativeAcknowledgementException.java   |  32 ++
 .../MllpReceiveAcknowledgementException.java    |  57 ++
 .../component/mllp/MllpReceiveException.java    |  46 ++
 .../component/mllp/MllpTcpClientProducer.java   | 209 ++++----
 .../component/mllp/MllpTcpServerConsumer.java   | 398 ++++++++------
 .../component/mllp/MllpTimeoutException.java    |  40 +-
 .../component/mllp/MllpWriteException.java      |  17 +-
 .../camel/component/mllp/impl/Hl7Util.java      |  90 ++++
 .../mllp/impl/MllpBufferedSocketWriter.java     | 123 +++++
 .../component/mllp/impl/MllpSocketReader.java   | 290 ++++++++++
 .../component/mllp/impl/MllpSocketUtil.java     | 230 ++++++++
 .../component/mllp/impl/MllpSocketWriter.java   | 143 +++++
 .../camel/component/mllp/impl/MllpUtil.java     | 381 -------------
 .../mllp/Hl7AcknowledgementGenerator.java       |   9 +-
 .../camel/component/mllp/MllpExceptionTest.java | 114 ++++
 .../mllp/MllpProducerConsumerLoopbackTest.java  |  24 +-
 .../MllpTcpClientConsumerBlueprintTest.java     |  36 --
 ...llpTcpClientProducerAcknowledgementTest.java | 207 ++++++-
 ...ntProducerAcknowledgementValidationTest.java | 283 ++++++++++
 .../MllpTcpClientProducerBlueprintTest.java     |  14 +-
 ...llpTcpClientProducerConnectionErrorTest.java | 165 ++++++
 .../mllp/MllpTcpClientProducerTest.java         |  86 ++-
 ...llpTcpServerConsumerAcknowledgementTest.java |  98 +++-
 .../MllpTcpServerConsumerConnectionTest.java    | 136 ++++-
 ...MllpTcpServerConsumerMessageHeadersTest.java | 158 ++++++
 ...pTcpServerConsumerMessageValidationTest.java | 317 +++++++++++
 .../mllp/MllpTcpServerConsumerTest.java         | 130 ++++-
 .../MllpTcpServerConsumerTransactionTest.java   | 142 +++++
 .../MllpTcpServerProducerBlueprintTest.java     |  35 --
 .../camel/component/mllp/impl/Hl7UtilTest.java  | 126 +++++
 ...BufferedSocketAcknowledgementWriterTest.java | 125 +++++
 .../MllpBufferedSocketMessageWriterTest.java    | 126 +++++
 .../MllpSocketAcknowledgementReaderTest.java    | 533 +++++++++++++++++++
 .../MllpSocketAcknowledgementWriterTest.java    | 150 ++++++
 .../mllp/impl/MllpSocketMessageReaderTest.java  | 527 ++++++++++++++++++
 .../mllp/impl/MllpSocketMessageWriterTest.java  | 150 ++++++
 .../mllp/impl/MllpSocketReaderTestSupport.java  | 342 ++++++++++++
 .../MllpSocketUtilExceptionHandlingTest.java    | 133 +++++
 .../impl/MllpSocketUtilFindXxxOfBlockTest.java  | 241 +++++++++
 .../mllp/impl/MllpSocketUtilSocketTest.java     | 288 ++++++++++
 .../mllp/impl/MllpSocketWriterTestSupport.java  | 132 +++++
 .../junit/rule/mllp/MllpClientResource.java     |  51 +-
 .../junit/rule/mllp/MllpServerResource.java     | 511 ++++++++++++------
 .../apache/camel/test/util/PayloadBuilder.java  | 211 ++++++++
 .../blueprint/mllp-tcp-client-producer-test.xml |   2 +-
 62 files changed, 7080 insertions(+), 1200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-mllp/pom.xml b/components/camel-mllp/pom.xml
index c9ca671..a56c363 100644
--- a/components/camel-mllp/pom.xml
+++ b/components/camel-mllp/pom.xml
@@ -56,6 +56,22 @@
       <artifactId>camel-test-blueprint</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-sjms</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq.tooling</groupId>
+      <artifactId>activemq-junit</artifactId>
+      <version>${activemq-version}</version>
+      <scope>test</scope>
+    </dependency>
 
     <!-- test logging -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java
new file mode 100644
index 0000000..c6fc878
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Consumer cannot deliver the MLLP Acknowledgement
+ */
+public class MllpAcknowledgementDeliveryException extends MllpAcknowledgementException {
+    static final String EXCEPTION_MESSAGE = "HL7 Acknowledgment Delivery Failed";
+
+    public MllpAcknowledgementDeliveryException(byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement);
+    }
+
+    public MllpAcknowledgementDeliveryException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
+    }
+
+    public MllpAcknowledgementDeliveryException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message, hl7Message, hl7Acknowledgement);
+    }
+
+    public MllpAcknowledgementDeliveryException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, hl7Acknowledgement, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
index 624745c..296430b 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
@@ -20,20 +20,25 @@ package org.apache.camel.component.mllp;
  * Base class for HL7 Application Acknowledgement Exceptions
  */
 public abstract class MllpAcknowledgementException extends MllpException {
+
     public MllpAcknowledgementException(String message) {
         super(message);
     }
 
-    public MllpAcknowledgementException(String message, byte[] mllpPayload) {
-        super(message, mllpPayload);
+    public MllpAcknowledgementException(String message, byte[] hl7Message) {
+        super(message, hl7Message);
+    }
+
+    public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message, hl7Message, hl7Acknowledgement);
     }
 
-    public MllpAcknowledgementException(String message, Throwable cause) {
-        super(message, cause);
+    public MllpAcknowledgementException(String message, byte[] hl7Message, Throwable cause) {
+        super(message, hl7Message, cause);
     }
 
-    public MllpAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) {
-        super(message, mllpPayload, cause);
+    public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, hl7Acknowledgement, cause);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java
new file mode 100644
index 0000000..8cb2822
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Producer does not receive a HL7 acknowledgement within the configured timespan
+ */
+public class MllpAcknowledgementTimeoutException extends MllpAcknowledgementException {
+    static final String EXCEPTION_MESSAGE = "Timeout receiving HL7 Acknowledgement";
+
+    public MllpAcknowledgementTimeoutException(byte[] hl7Message) {
+        super(EXCEPTION_MESSAGE, hl7Message);
+    }
+
+    public MllpAcknowledgementTimeoutException(byte[] hl7Message, byte[] partialHl7Acknowledgement) {
+        super(EXCEPTION_MESSAGE, hl7Message, partialHl7Acknowledgement);
+    }
+
+    public MllpAcknowledgementTimeoutException(byte[] hl7Message, Throwable cause) {
+        super(EXCEPTION_MESSAGE, hl7Message, cause);
+    }
+
+    public MllpAcknowledgementTimeoutException(byte[] hl7Message, byte[] partialHl7Acknowledgement, Throwable cause) {
+        super(EXCEPTION_MESSAGE, hl7Message, partialHl7Acknowledgement, cause);
+    }
+
+    public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message) {
+        super(message, hl7Message);
+    }
+
+    public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, byte[] partialHl7Acknowledgement) {
+        super(message, hl7Message, partialHl7Acknowledgement);
+    }
+
+    public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, Throwable cause) {
+        super(message, hl7Message, cause);
+    }
+
+    public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, byte[] partialHl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, partialHl7Acknowledgement, cause);
+    }
+
+    /**
+     * Get the HL7 acknowledgement payload associated with this exception, if any.
+     *
+     * @return If the timeout occurred while attempting to receive an HL7 Message, this will be null.  If the timeout
+     * occurred while attempting to receive an HL7 Acknowledgement, this will be the HL7 Message.  If the timeout occurred
+     * while attempting to complete the read of an HL7 message (i.e. part of the message has already been read), this
+     * will be the partial acknowledgement payload that was read before the timeout.
+     */
+    public byte[] getHl7Acknowledgement() {
+        return super.getHl7Acknowledgement();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java
deleted file mode 100644
index 9288a14..0000000
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mllp;
-
-/**
- * Raised when a MLLP Producer does not receive a HL7 acknowledgement within the configured timespan
- */
-public class MllpAcknowledgementTimoutException extends MllpTimeoutException {
-    public MllpAcknowledgementTimoutException(String message) {
-        super(message);
-    }
-
-    public MllpAcknowledgementTimoutException(String message, byte[] mllpPayload) {
-        super(message, mllpPayload);
-    }
-
-    public MllpAcknowledgementTimoutException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public MllpAcknowledgementTimoutException(String message, byte[] mllpPayload, Throwable cause) {
-        super(message, mllpPayload, cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
index 7d7ecd2..c8db47a 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
@@ -19,20 +19,19 @@ package org.apache.camel.component.mllp;
 /**
  * Raised when a MLLP Producer receives a HL7 Application Error Acknowledgement
  */
-public class MllpApplicationErrorAcknowledgementException extends MllpAcknowledgementException {
-    public MllpApplicationErrorAcknowledgementException(String message) {
-        super(message);
-    }
+public class MllpApplicationErrorAcknowledgementException extends MllpNegativeAcknowledgementException {
+    static final String EXCEPTION_MESSAGE = "HL7 Application Error Acknowledgment Received";
 
-    public MllpApplicationErrorAcknowledgementException(String message, byte[] mllpPayload) {
-        super(message, mllpPayload);
+    public MllpApplicationErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement);
     }
 
-    public MllpApplicationErrorAcknowledgementException(String message, Throwable cause) {
-        super(message, cause);
+    public MllpApplicationErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
     }
 
-    public MllpApplicationErrorAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) {
-        super(message, mllpPayload, cause);
+    @Override
+    public String getAcknowledgmentType() {
+        return "AE";
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
index ef2a39d..00dd3e4 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
@@ -19,20 +19,19 @@ package org.apache.camel.component.mllp;
 /**
  * Raised when a MLLP Producer receives a HL7 Application Reject Acknowledgement
  */
-public class MllpApplicationRejectAcknowledgementException extends MllpAcknowledgementException {
-    public MllpApplicationRejectAcknowledgementException(String message) {
-        super(message);
-    }
+public class MllpApplicationRejectAcknowledgementException extends MllpNegativeAcknowledgementException {
+    static final String EXCEPTION_MESSAGE = "HL7 Application Reject Acknowledgment Received";
 
-    public MllpApplicationRejectAcknowledgementException(String message, byte[] mllpPayload) {
-        super(message, mllpPayload);
+    public MllpApplicationRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement);
     }
 
-    public MllpApplicationRejectAcknowledgementException(String message, Throwable cause) {
-        super(message, cause);
+    public MllpApplicationRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
     }
 
-    public MllpApplicationRejectAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) {
-        super(message, mllpPayload, cause);
+    @Override
+    public String getAcknowledgmentType() {
+        return "AR";
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
index 1269c56..bec34d9 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
@@ -19,20 +19,19 @@ package org.apache.camel.component.mllp;
 /**
  * Raised when a MLLP Producer receives a HL7 Commit Error Acknowledgement
  */
-public class MllpCommitErrorAcknowledgementException extends MllpAcknowledgementException {
-    public MllpCommitErrorAcknowledgementException(String message) {
-        super(message);
-    }
+public class MllpCommitErrorAcknowledgementException extends MllpNegativeAcknowledgementException {
+    static final String EXCEPTION_MESSAGE = "HL7 Commit Error Acknowledgment Received";
 
-    public MllpCommitErrorAcknowledgementException(String message, byte[] mllpPayload) {
-        super(message, mllpPayload);
+    public MllpCommitErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement);
     }
 
-    public MllpCommitErrorAcknowledgementException(String message, Throwable cause) {
-        super(message, cause);
+    public MllpCommitErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
     }
 
-    public MllpCommitErrorAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) {
-        super(message, mllpPayload, cause);
+    @Override
+    public String getAcknowledgmentType() {
+        return "CE";
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
index 99043a9..f98ee5e 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
@@ -19,20 +19,19 @@ package org.apache.camel.component.mllp;
 /**
  * Raised when a MLLP Producer receives a HL7 Commit Reject Acknowledgement
  */
-public class MllpCommitRejectAcknowledgementException extends MllpAcknowledgementException {
-    public MllpCommitRejectAcknowledgementException(String message) {
-        super(message);
-    }
+public class MllpCommitRejectAcknowledgementException extends MllpNegativeAcknowledgementException {
+    static final String EXCEPTION_MESSAGE = "HL7 Commit Reject Acknowledgment Received";
 
-    public MllpCommitRejectAcknowledgementException(String message, byte[] mllpPayload) {
-        super(message, mllpPayload);
+    public MllpCommitRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement);
     }
 
-    public MllpCommitRejectAcknowledgementException(String message, Throwable cause) {
-        super(message, cause);
+    public MllpCommitRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
     }
 
-    public MllpCommitRejectAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) {
-        super(message, mllpPayload, cause);
+    @Override
+    public String getAcknowledgmentType() {
+        return "CR";
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
index 3a67dd3..f3e22b1 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
@@ -22,11 +22,15 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.UriEndpointComponent;
 
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+
 /**
  * Represents the component that manages {@link MllpEndpoint}.
  */
 public class MllpComponent extends UriEndpointComponent {
-    public static final String MLLP_LOG_PHI_PROPERTY = "org.apache.camel.mllp.logPHI";
+    public static final String MLLP_LOG_PHI_PROPERTY = "org.apache.camel.component.mllp.logPHI";
 
     public MllpComponent() {
         super(MllpEndpoint.class);
@@ -64,4 +68,38 @@ public class MllpComponent extends UriEndpointComponent {
         return endpoint;
     }
 
+    public static boolean isLogPhi() {
+        String logPhiProperty = System.getProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
+        return Boolean.valueOf(logPhiProperty);
+    }
+
+    public static String covertToPrintFriendlyString(String hl7Message) {
+        if (hl7Message == null) {
+            return "null";
+        } else if (hl7Message.isEmpty()) {
+            return "empty";
+        }
+
+        return hl7Message.replaceAll("" + START_OF_BLOCK, "<VT>").replaceAll("" + END_OF_BLOCK, "<FS>").replaceAll("\r", "<CR>").replaceAll("\n", "<LF>");
+    }
+
+    public static String covertBytesToPrintFriendlyString(byte[] hl7Bytes) {
+        if (hl7Bytes == null) {
+            return "null";
+        } else if (hl7Bytes.length == 0) {
+            return "";
+        }
+
+        return covertBytesToPrintFriendlyString(hl7Bytes, 0, hl7Bytes.length);
+    }
+
+    public static String covertBytesToPrintFriendlyString(byte[] hl7Bytes, int startPosition, int length) {
+        if (null == hl7Bytes) {
+            return "null";
+        } else if (hl7Bytes.length == 0) {
+            return "";
+        }
+        return covertToPrintFriendlyString(new String(hl7Bytes, startPosition, length));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
index da5eb77..4e14da0 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
@@ -21,8 +21,11 @@ public final class MllpConstants {
     public static final String MLLP_REMOTE_ADDRESS = "CamelMllpRemoteAddress";
 
     public static final String MLLP_ACKNOWLEDGEMENT = "CamelMllpAcknowledgement";
+    public static final String MLLP_ACKNOWLEDGEMENT_STRING = "CamelMllpAcknowledgementString";
     public static final String MLLP_ACKNOWLEDGEMENT_TYPE = "CamelMllpAcknowledgementType";
 
+    public static final String MLLP_ACKNOWLEDGEMENT_EXCEPTION = "CamelMllpAcknowledgementException";
+    public static final String MLLP_AUTO_ACKNOWLEDGE = "CamelMllpAutoAcknowledge";
     /*
      Connection Control Exchange Properties
       - For Consumers, "SEND" => ACKNOWLEDGEMENT

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCorruptFrameException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCorruptFrameException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCorruptFrameException.java
deleted file mode 100644
index 0310530..0000000
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCorruptFrameException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mllp;
-
-/**
- * Raised when a MLLP Producer or consumer encounters a corrupt MLLP Frame while attempting
- * to read or write a MLLP payload.
- */
-public class MllpCorruptFrameException extends MllpException {
-    public MllpCorruptFrameException(String message) {
-        super(message);
-    }
-
-    public MllpCorruptFrameException(String message, byte[] mllpPayload) {
-        super(message, mllpPayload);
-    }
-
-    public MllpCorruptFrameException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public MllpCorruptFrameException(String message, byte[] mllpPayload, Throwable cause) {
-        super(message, mllpPayload, cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index 81a2e97..3732254 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -30,14 +30,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Represents a MLLP endpoint.
+ * The MLLP component is designed to handle the MLLP protocol and provide the functionality required by Healthcare providers to communicate with other systems using the MLLP protocol.
+ *
  * <p/>
  * NOTE: MLLP payloads are not logged unless the logging level is set to DEBUG or TRACE to avoid introducing PHI
  * into the log files.  Logging of PHI can be globally disabled by setting the org.apache.camel.mllp.logPHI system
  * property to false.
  * <p/>
  */
-@UriEndpoint(scheme = "mllp", title = "mllp", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp")
+@UriEndpoint(scheme = "mllp", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp")
 public class MllpEndpoint extends DefaultEndpoint {
     public static final char START_OF_BLOCK = 0x0b;      // VT (vertical tab)        - decimal 11, octal 013
     public static final char END_OF_BLOCK = 0x1c;        // FS (file separator)      - decimal 28, octal 034
@@ -54,24 +55,30 @@ public class MllpEndpoint extends DefaultEndpoint {
     @UriPath @Metadata(required = "true")
     int port = -1;
 
-    @UriParam(defaultValue = "5")
+    @UriParam(label = "advanced", defaultValue = "5")
     int backlog = 5;
 
-    @UriParam(defaultValue = "30000")
+    @UriParam(label = "timeout", defaultValue = "30000")
     int bindTimeout = 30000;
 
-    @UriParam(defaultValue = "5000")
+    @UriParam(label = "timeout", defaultValue = "5000")
     int bindRetryInterval = 5000;
 
-    @UriParam(defaultValue = "60000")
+    @UriParam(label = "timeout", defaultValue = "60000")
     int acceptTimeout = 60000;
 
-    @UriParam(defaultValue = "30000")
+    @UriParam(label = "timeout", defaultValue = "30000")
     int connectTimeout = 30000;
 
-    @UriParam(defaultValue = "10000")
+    @UriParam(label = "timeout", defaultValue = "10000")
     int receiveTimeout = 10000;
 
+    @UriParam(label = "timeout", defaultValue = "-1")
+    int maxReceiveTimeouts = -1;
+
+    @UriParam(label = "timeout", defaultValue = "500")
+    int readTimeout = 500;
+
     @UriParam(defaultValue = "true")
     boolean keepAlive = true;
 
@@ -81,16 +88,25 @@ public class MllpEndpoint extends DefaultEndpoint {
     @UriParam
     boolean reuseAddress;
 
-    @UriParam
+    @UriParam(label = "advanced")
     Integer receiveBufferSize;
 
-    @UriParam
+    @UriParam(label = "advanced")
     Integer sendBufferSize;
 
     @UriParam(defaultValue = "true")
     boolean autoAck = true;
 
-    @UriParam
+    @UriParam(defaultValue = "true")
+    boolean hl7Headers = true;
+
+    @UriParam(defaultValue = "true")
+    boolean bufferWrites = true;
+
+    @UriParam(defaultValue = "false")
+    boolean validatePayload;
+
+    @UriParam(label = "codec")
     String charsetName;
 
     public MllpEndpoint(String uri, MllpComponent component) {
@@ -223,7 +239,7 @@ public class MllpEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Timeout value while waiting for a TCP connection
+     * Timeout (in milliseconds) while waiting for a TCP connection
      * <p/>
      * TCP Server Only
      *
@@ -238,7 +254,7 @@ public class MllpEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Timeout value for establishing for a TCP connection
+     * Timeout (in milliseconds) for establishing for a TCP connection
      * <p/>
      * TCP Client only
      *
@@ -253,7 +269,7 @@ public class MllpEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * The SO_TIMEOUT value used when waiting for the start of an MLLP frame
+     * The SO_TIMEOUT value (in milliseconds) used when waiting for the start of an MLLP frame
      *
      * @param receiveTimeout timeout in milliseconds
      */
@@ -261,6 +277,32 @@ public class MllpEndpoint extends DefaultEndpoint {
         this.receiveTimeout = receiveTimeout;
     }
 
+    public int getMaxReceiveTimeouts() {
+        return maxReceiveTimeouts;
+    }
+
+    /**
+     * The maximum number of timeouts (specified by receiveTimeout) allowed before the TCP Connection will be reset.
+     *
+     * @param maxReceiveTimeouts maximum number of receiveTimeouts
+     */
+    public void setMaxReceiveTimeouts(int maxReceiveTimeouts) {
+        this.maxReceiveTimeouts = maxReceiveTimeouts;
+    }
+
+    public int getReadTimeout() {
+        return readTimeout;
+    }
+
+    /**
+     * The SO_TIMEOUT value (in milliseconds) used after the start of an MLLP frame has been received
+     *
+     * @param readTimeout timeout in milliseconds
+     */
+    public void setReadTimeout(int readTimeout) {
+        this.readTimeout = readTimeout;
+    }
+
     public boolean isKeepAlive() {
         return keepAlive;
     }
@@ -305,7 +347,7 @@ public class MllpEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Sets the SO_RCVBUF option to the specified value
+     * Sets the SO_RCVBUF option to the specified value (in bytes)
      *
      * @param receiveBufferSize the SO_RCVBUF option value.  If null, the system default is used
      */
@@ -318,7 +360,7 @@ public class MllpEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Sets the SO_SNDBUF option to the specified value
+     * Sets the SO_SNDBUF option to the specified value (in bytes)
      *
      * @param sendBufferSize the SO_SNDBUF option value.  If null, the system default is used
      */
@@ -341,4 +383,52 @@ public class MllpEndpoint extends DefaultEndpoint {
         this.autoAck = autoAck;
     }
 
+    public boolean isHl7Headers() {
+        return hl7Headers;
+    }
+
+    /**
+     * Enable/Disable the automatic generation of message headers from the HL7 Message
+     *
+     * MLLP Consumers only
+     *
+     * @param hl7Headers enabled if true, otherwise disabled
+     */
+    public void setHl7Headers(boolean hl7Headers) {
+        this.hl7Headers = hl7Headers;
+    }
+
+    public boolean isValidatePayload() {
+        return validatePayload;
+    }
+
+    /**
+     * Enable/Disable the validation of HL7 Payloads
+     *
+     * If enabled, HL7 Payloads received from external systems will be validated (see Hl7Util.generateInvalidPayloadExceptionMessage for details on the validation).
+     * If and invalid payload is detected, a MllpInvalidMessageException (for consumers) or a MllpInvalidAcknowledgementException will be thrown.
+     *
+     * @param validatePayload enabled if true, otherwise disabled
+     */
+    public void setValidatePayload(boolean validatePayload) {
+        this.validatePayload = validatePayload;
+    }
+
+    public boolean isBufferWrites() {
+        return bufferWrites;
+    }
+
+    /**
+     * Enable/Disable the validation of HL7 Payloads
+     *
+     * If enabled, MLLP Payloads are buffered and written to the external system in a single write(byte[]) operation.
+     * If disabled, the MLLP payload will not be buffered, and three write operations will be used.  The first operation
+     * will write the MLLP start-of-block character {0x0b (ASCII VT)}, the second operation will write the HL7 payload, and the
+     * third operation will writh the MLLP end-of-block character and the MLLP end-of-data character {[0x1c, 0x0d] (ASCII [FS, CR])}.
+     *
+     * @param bufferWrites enabled if true, otherwise disabled
+     */
+    public void setBufferWrites(boolean bufferWrites) {
+        this.bufferWrites = bufferWrites;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
index a1bb182..0d2c1ce 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
@@ -20,30 +20,88 @@ package org.apache.camel.component.mllp;
  * Base class for all MLLP Exceptions, and also used as a generic MLLP exception
  */
 public class MllpException extends Exception {
-    private final byte[] mllpPayload;
+    private final byte[] hl7Message;
+    private final byte[] hl7Acknowledgement;
 
     public MllpException(String message) {
         super(message);
-        this.mllpPayload = null;
+        this.hl7Message = null;
+        this.hl7Acknowledgement = null;
     }
 
-    public MllpException(String message, byte[] mllpPayload) {
+    public MllpException(String message, byte[] hl7Message) {
         super(message);
-        this.mllpPayload = mllpPayload;
+        this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+        this.hl7Acknowledgement = null;
+    }
+
+    public MllpException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message);
+        this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+        this.hl7Acknowledgement = (hl7Acknowledgement != null && hl7Acknowledgement.length > 0) ? hl7Acknowledgement : null;
     }
 
     public MllpException(String message, Throwable cause) {
         super(message, cause);
-        this.mllpPayload = null;
+        this.hl7Message = null;
+        this.hl7Acknowledgement = null;
     }
 
-    public MllpException(String message, byte[] mllpPayload, Throwable cause) {
+    public MllpException(String message, byte[] hl7Message, Throwable cause) {
         super(message, cause);
-        this.mllpPayload = mllpPayload;
+        this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+        this.hl7Acknowledgement = null;
+    }
+
+    public MllpException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, cause);
+        this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+        this.hl7Acknowledgement = (hl7Acknowledgement != null && hl7Acknowledgement.length > 0) ? hl7Acknowledgement : null;
+    }
+
+
+    /**
+     * Get the HL7 message payload associated with this exception, if any.
+     *
+     * @return HL7 message payload
+     */
+    public byte[] getHl7Message() {
+        return hl7Message;
+    }
+
+    /**
+     * Get the HL7 acknowledgement payload associated with this exception, if any.
+     *
+     * @return HL7 acknowledgement payload
+     */
+    public byte[] getHl7Acknowledgement() {
+        return hl7Acknowledgement;
+    }
+
+    /**
+     * Override the base version of this method, and include the HL7 Message and Acknowledgement, if any
+     *
+     * @return the detail message of this MLLP Exception
+     */
+    @Override
+    public String getMessage() {
+        if (MllpComponent.isLogPhi()) {
+            return String.format("%s \n\t{hl7Message= %s} \n\t{hl7Acknowledgement= %s}",
+                    super.getMessage(), MllpComponent.covertBytesToPrintFriendlyString(hl7Message), MllpComponent.covertBytesToPrintFriendlyString(hl7Acknowledgement));
+        } else {
+            return super.getMessage();
+        }
     }
 
+    /**
+     * Return the MLLP Payload that is most likely the cause of the Exception
+     *
+     * If the HL7 Acknowledgement is present, return it.  Otherwise, return the HL7 Message.
+     *
+     * @return the MLLP Payload with the framing error
+     */
     public byte[] getMllpPayload() {
-        return mllpPayload;
+        return (hl7Acknowledgement != null  &&  hl7Acknowledgement.length > 0) ? hl7Acknowledgement : hl7Message;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java
new file mode 100644
index 0000000..7b7a3c4
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * @deprecated - replaced by more specific exceptions.  MllpTimeoutException, MllpInvalidMessageException and MllpInvalidAcknowledgementException
+ * Raised when a MLLP Producer or consumer encounters a corrupt MLLP Frame while attempting
+ * to readEnvelopedPayload or writeEnvelopedMessage a MLLP payload.
+ */
+public class MllpFrameException extends MllpException {
+    public MllpFrameException(String message, byte[] hl7Message) {
+        super(message, hl7Message);
+    }
+
+    public MllpFrameException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message, hl7Message, hl7Acknowledgement);
+    }
+
+    public MllpFrameException(String message, byte[] hl7Message, Throwable cause) {
+        super(message, hl7Message, cause);
+    }
+
+    public MllpFrameException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, hl7Acknowledgement, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidAcknowledgementException.java
index 495cfae..67d5316 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidAcknowledgementException.java
@@ -20,19 +20,11 @@ package org.apache.camel.component.mllp;
  * Raised when a MLLP Producer receives a HL7 Acknowledgement for which the HL7 Acknowledgement type cannot be determined.
  */
 public class MllpInvalidAcknowledgementException extends MllpAcknowledgementException {
-    public MllpInvalidAcknowledgementException(String message) {
-        super(message);
+    public MllpInvalidAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message, hl7Message, hl7Acknowledgement);
     }
 
-    public MllpInvalidAcknowledgementException(String message, byte[] mllpPayload) {
-        super(message, mllpPayload);
-    }
-
-    public MllpInvalidAcknowledgementException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public MllpInvalidAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) {
-        super(message, mllpPayload, cause);
+    public MllpInvalidAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, hl7Acknowledgement, cause);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java
new file mode 100644
index 0000000..9cde29a
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Consumer receives an invalid HL7 Message.
+ */
+public class MllpInvalidMessageException extends MllpException {
+    public MllpInvalidMessageException(String message, byte[] hl7Message) {
+        super(message, hl7Message);
+    }
+
+    public MllpInvalidMessageException(String message, byte[] hl7Message, Throwable cause) {
+        super(message, hl7Message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java
new file mode 100644
index 0000000..6b8a4f8
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Abstract base for all MLLP Negative Acknowledgements
+ */
+public abstract class MllpNegativeAcknowledgementException extends MllpAcknowledgementException {
+    public MllpNegativeAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message, hl7Message, hl7Acknowledgement);
+    }
+
+    public MllpNegativeAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, hl7Acknowledgement, cause);
+    }
+
+    public abstract String getAcknowledgmentType();
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java
new file mode 100644
index 0000000..2268137
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Consumer cannot deliver the MLLP Acknowledgement
+ */
+public class MllpReceiveAcknowledgementException extends MllpAcknowledgementException {
+    static final String EXCEPTION_MESSAGE = "HL7 Acknowledgment Receipt Failed";
+
+    public MllpReceiveAcknowledgementException(byte[] hl7Message) {
+        super(EXCEPTION_MESSAGE, hl7Message);
+    }
+
+
+    public MllpReceiveAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement);
+    }
+
+    public MllpReceiveAcknowledgementException(byte[] hl7Message, Throwable cause) {
+        super(EXCEPTION_MESSAGE, hl7Message, cause);
+    }
+
+
+    public MllpReceiveAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
+    }
+
+    public MllpReceiveAcknowledgementException(String message, byte[] hl7Message) {
+        super(message, hl7Message);
+    }
+
+    public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message, hl7Message, hl7Acknowledgement);
+    }
+
+    public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, Throwable cause) {
+        super(message, hl7Message, cause);
+    }
+    public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, hl7Acknowledgement, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java
new file mode 100644
index 0000000..c755d02
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Consumer cannot deliver the MLLP Acknowledgement
+ */
+public class MllpReceiveException extends MllpException {
+    public MllpReceiveException(String message) {
+        super(message);
+    }
+
+    public MllpReceiveException(String message, byte[] hl7Message) {
+        super(message, hl7Message);
+    }
+
+    public MllpReceiveException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message, hl7Message, hl7Acknowledgement);
+    }
+
+    public MllpReceiveException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MllpReceiveException(String message, byte[] hl7Message, Throwable cause) {
+        super(message, hl7Message, cause);
+    }
+
+    public MllpReceiveException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, hl7Acknowledgement, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
index 2e74f6c..69176bc 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
@@ -19,16 +19,20 @@ package org.apache.camel.component.mllp;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.net.SocketAddress;
 import java.net.SocketException;
-import java.net.SocketTimeoutException;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.apache.camel.component.mllp.impl.MllpUtil;
+import org.apache.camel.component.mllp.impl.Hl7Util;
+import org.apache.camel.component.mllp.impl.MllpBufferedSocketWriter;
+import org.apache.camel.component.mllp.impl.MllpSocketReader;
+import org.apache.camel.component.mllp.impl.MllpSocketUtil;
+import org.apache.camel.component.mllp.impl.MllpSocketWriter;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.IOHelper;
 
 import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT;
+import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING;
 import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE;
 import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_AFTER_SEND;
 import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_BEFORE_SEND;
@@ -46,6 +50,9 @@ public class MllpTcpClientProducer extends DefaultProducer {
 
     Socket socket;
 
+    MllpSocketReader mllpSocketReader;
+    MllpSocketWriter mllpSocketWriter;
+
     public MllpTcpClientProducer(MllpEndpoint endpoint) throws SocketException {
         super(endpoint);
         log.trace("MllpTcpClientProducer(endpoint)");
@@ -64,7 +71,7 @@ public class MllpTcpClientProducer extends DefaultProducer {
     protected void doStop() throws Exception {
         log.trace("doStop()");
 
-        MllpUtil.closeConnection(socket);
+        MllpSocketUtil.close(socket, log, "Stopping component");
 
         super.doStop();
     }
@@ -75,15 +82,18 @@ public class MllpTcpClientProducer extends DefaultProducer {
 
         // Check BEFORE_SEND Properties
         if (exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class)) {
-            MllpUtil.resetConnection(socket);
+            MllpSocketUtil.reset(socket, log, "Exchange property " + MLLP_RESET_CONNECTION_BEFORE_SEND + " = " + exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class));
             return;
         } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class)) {
-            MllpUtil.closeConnection(socket);
+            MllpSocketUtil.close(socket, log, "Exchange property " + MLLP_CLOSE_CONNECTION_BEFORE_SEND + " = " + exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class));
+            return;
         }
 
-        Exception connectionException = checkConnection();
-        if (null != connectionException) {
-            exchange.setException(connectionException);
+        // Establish a connection if needed
+        try {
+            checkConnection();
+        } catch (IOException ioEx) {
+            exchange.setException(ioEx);
             return;
         }
 
@@ -94,95 +104,118 @@ public class MllpTcpClientProducer extends DefaultProducer {
             message = exchange.getIn();
         }
 
+        message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString());
+        message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress().toString());
+
+        // Send the message to the external system
         byte[] hl7MessageBytes = message.getMandatoryBody(byte[].class);
+        byte[] acknowledgementBytes = null;
 
-        log.debug("Sending message to external system");
         try {
-            MllpUtil.writeFramedPayload(socket, hl7MessageBytes);
+            log.debug("Sending message to external system");
+            mllpSocketWriter.writeEnvelopedPayload(hl7MessageBytes, null);
+            log.debug("Reading acknowledgement from external system");
+            acknowledgementBytes = mllpSocketReader.readEnvelopedPayload(hl7MessageBytes);
+        } catch (MllpWriteException writeEx) {
+            MllpSocketUtil.reset(socket, log, writeEx.getMessage());
+            exchange.setException(writeEx);
+            return;
+        } catch (MllpReceiveException ackReceiveEx) {
+            MllpSocketUtil.reset(socket, log, ackReceiveEx.getMessage());
+            exchange.setException(ackReceiveEx);
+            return;
         } catch (MllpException mllpEx) {
+            Throwable mllpExCause = mllpEx.getCause();
+            if (mllpExCause != null && mllpExCause instanceof IOException) {
+                MllpSocketUtil.reset(socket, log, mllpEx.getMessage());
+            }
             exchange.setException(mllpEx);
             return;
         }
 
-        log.debug("Reading acknowledgement from external system");
-        byte[] acknowledgementBytes = null;
-        try {
-            if (MllpUtil.openFrame(socket)) {
-                acknowledgementBytes = MllpUtil.closeFrame(socket);
+        log.debug("Populating message headers with the acknowledgement from the external system");
+        message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
+        message.setHeader(MLLP_ACKNOWLEDGEMENT_STRING, new String(acknowledgementBytes, IOHelper.getCharsetName(exchange, true)));
+
+        if (endpoint.validatePayload) {
+            String exceptionMessage = Hl7Util.generateInvalidPayloadExceptionMessage(acknowledgementBytes);
+            if (exceptionMessage != null) {
+                exchange.setException(new MllpInvalidAcknowledgementException(exceptionMessage, hl7MessageBytes, acknowledgementBytes));
+                return;
             }
-        } catch (SocketTimeoutException timeoutEx) {
-            exchange.setException(new MllpAcknowledgementTimoutException("Acknowledgement timout", timeoutEx));
-            return;
+        }
+
+        log.debug("Processing the acknowledgement from the external system");
+        try {
+            String acknowledgementType = processAcknowledgment(hl7MessageBytes, acknowledgementBytes);
+            message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, acknowledgementType);
         } catch (MllpException mllpEx) {
             exchange.setException(mllpEx);
             return;
         }
 
-        if (null != acknowledgementBytes) {
-            log.debug("Populating the exchange with the acknowledgement from the external system");
-            message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
+        // Check AFTER_SEND Properties
+        if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) {
+            MllpSocketUtil.reset(socket, log, "Exchange property " + MLLP_RESET_CONNECTION_AFTER_SEND + " = " + exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class));
+        } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) {
+            MllpSocketUtil.close(socket, log, "Exchange property " + MLLP_CLOSE_CONNECTION_AFTER_SEND + " = " + exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class));
+        }
+    }
 
-            message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString());
-            message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress());
+    private String processAcknowledgment(byte[] hl7MessageBytes, byte[] hl7AcknowledgementBytes) throws MllpException {
+        String acknowledgementType = "";
 
-            // Now, extract the acknowledgement type and check for a NACK
-            byte fieldDelim = acknowledgementBytes[3];
+        if (hl7AcknowledgementBytes != null && hl7AcknowledgementBytes.length > 3) {
+            // Extract the acknowledgement type and check for a NACK
+            byte fieldDelim = hl7AcknowledgementBytes[3];
             // First, find the beginning of the MSA segment - should be the second segment
             int msaStartIndex = -1;
-            for (int i = 0; i < acknowledgementBytes.length; ++i) {
-                if (SEGMENT_DELIMITER == acknowledgementBytes[i]) {
+            for (int i = 0; i < hl7AcknowledgementBytes.length; ++i) {
+                if (SEGMENT_DELIMITER == hl7AcknowledgementBytes[i]) {
                     final byte bM = 77;
                     final byte bS = 83;
                     final byte bC = 67;
                     final byte bA = 65;
                     final byte bE = 69;
                     final byte bR = 82;
-                        /* We've found the start of a new segment - make sure peeking ahead
-                           won't run off the end of the array - we need at least 7 more bytes
-                         */
-                    if (acknowledgementBytes.length > i + 7) {
+                    /* We've found the start of a new segment - make sure peeking ahead
+                       won't run off the end of the array - we need at least 7 more bytes
+                     */
+                    if (hl7AcknowledgementBytes.length > i + 7) {
                         // We can safely peek ahead
-                        if (bM == acknowledgementBytes[i + 1] && bS == acknowledgementBytes[i + 2] && bA == acknowledgementBytes[i + 3] && fieldDelim == acknowledgementBytes[i + 4]) {
+                        if (bM == hl7AcknowledgementBytes[i + 1] && bS == hl7AcknowledgementBytes[i + 2] && bA == hl7AcknowledgementBytes[i + 3] && fieldDelim == hl7AcknowledgementBytes[i + 4]) {
                             // Found the beginning of the MSA - the next two bytes should be our acknowledgement code
                             msaStartIndex = i + 1;
-                            if (bA != acknowledgementBytes[i + 5] && bC != acknowledgementBytes[i + 5]) {
-                                exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+                            if (bA != hl7AcknowledgementBytes[i + 5] && bC != hl7AcknowledgementBytes[i + 5]) {
+                                String errorMessage = "Unsupported acknowledgement type: " + new String(hl7AcknowledgementBytes, i + 5, 2);
+                                throw new MllpInvalidAcknowledgementException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes);
                             } else {
-                                String acknowledgemenTypeString;
-                                switch (acknowledgementBytes[i + 6]) {
+                                switch (hl7AcknowledgementBytes[i + 6]) {
                                 case bA:
-                                    // We have an AA or CA- make sure that's the end of the field
-                                    if (fieldDelim != acknowledgementBytes[i + 7]) {
-                                        exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
-                                    }
-                                    if (bA == acknowledgementBytes[i + 5]) {
-                                        message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+                                    // We have an AA or CA
+                                    if (bA == hl7AcknowledgementBytes[i + 5]) {
+                                        acknowledgementType = "AA";
                                     } else {
-                                        message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CA");
+                                        acknowledgementType = "CA";
                                     }
                                     break;
                                 case bE:
                                     // We have an AE or CE
-                                    if (bA == acknowledgementBytes[i + 5]) {
-                                        message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AE");
-                                        exchange.setException(new MllpApplicationErrorAcknowledgementException(new String(acknowledgementBytes)));
+                                    if (bA == hl7AcknowledgementBytes[i + 5]) {
+                                        throw new MllpApplicationErrorAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
                                     } else {
-                                        message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CE");
-                                        exchange.setException(new MllpCommitErrorAcknowledgementException(new String(acknowledgementBytes)));
+                                        throw new MllpCommitErrorAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
                                     }
-                                    break;
                                 case bR:
                                     // We have an AR or CR
-                                    if (bA == acknowledgementBytes[i + 5]) {
-                                        message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AR");
-                                        exchange.setException(new MllpApplicationRejectAcknowledgementException(new String(acknowledgementBytes)));
+                                    if (bA == hl7AcknowledgementBytes[i + 5]) {
+                                        throw new MllpApplicationRejectAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
                                     } else {
-                                        message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CR");
-                                        exchange.setException(new MllpCommitRejectAcknowledgementException(new String(acknowledgementBytes)));
+                                        throw new MllpCommitRejectAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
                                     }
-                                    break;
                                 default:
-                                    exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+                                    String errorMessage = "Unsupported acknowledgement type: " + new String(hl7AcknowledgementBytes, i + 5, 2);
+                                    throw new MllpInvalidAcknowledgementException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes);
                                 }
                             }
 
@@ -192,18 +225,13 @@ public class MllpTcpClientProducer extends DefaultProducer {
                 }
 
             }
-            if (-1 == msaStartIndex) {
+            if (-1 == msaStartIndex  &&  endpoint.validatePayload) {
                 // Didn't find an MSA
-                exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+                throw new MllpInvalidAcknowledgementException("MSA Not found in acknowledgement", hl7MessageBytes, hl7AcknowledgementBytes);
             }
         }
-        // Check AFTER_SEND Properties
-        if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) {
-            MllpUtil.resetConnection(socket);
-            return;
-        } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) {
-            MllpUtil.closeConnection(socket);
-        }
+
+        return acknowledgementType;
     }
 
     /**
@@ -211,28 +239,24 @@ public class MllpTcpClientProducer extends DefaultProducer {
      *
      * @return null if the connection is valid, otherwise the Exception encounted checking the connection
      */
-    Exception checkConnection() {
+    void checkConnection() throws IOException {
         if (null == socket || socket.isClosed() || !socket.isConnected()) {
             socket = new Socket();
 
-            try {
-                socket.setKeepAlive(endpoint.keepAlive);
-                socket.setTcpNoDelay(endpoint.tcpNoDelay);
-                if (null != endpoint.receiveBufferSize) {
-                    socket.setReceiveBufferSize(endpoint.receiveBufferSize);
-                }
-                if (null != endpoint.sendBufferSize) {
-                    socket.setSendBufferSize(endpoint.sendBufferSize);
-                }
-                socket.setReuseAddress(endpoint.reuseAddress);
-                socket.setSoLinger(false, -1);
-
-                // Read Timeout
-                socket.setSoTimeout(endpoint.receiveTimeout);
-            } catch (SocketException e) {
-                return e;
+            socket.setKeepAlive(endpoint.keepAlive);
+            socket.setTcpNoDelay(endpoint.tcpNoDelay);
+            if (null != endpoint.receiveBufferSize) {
+                socket.setReceiveBufferSize(endpoint.receiveBufferSize);
+            } else {
+                endpoint.receiveBufferSize = socket.getReceiveBufferSize();
             }
-
+            if (null != endpoint.sendBufferSize) {
+                socket.setSendBufferSize(endpoint.sendBufferSize);
+            } else {
+                endpoint.sendBufferSize = socket.getSendBufferSize();
+            }
+            socket.setReuseAddress(endpoint.reuseAddress);
+            socket.setSoLinger(false, -1);
 
             InetSocketAddress socketAddress;
             if (null == endpoint.getHostname()) {
@@ -240,17 +264,18 @@ public class MllpTcpClientProducer extends DefaultProducer {
             } else {
                 socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort());
             }
+
             log.debug("Connecting to socket on {}", socketAddress);
-            try {
-                socket.connect(socketAddress, endpoint.connectTimeout);
-            } catch (SocketTimeoutException e) {
-                return e;
-            } catch (IOException e) {
-                return e;
+            socket.connect(socketAddress, endpoint.connectTimeout);
+
+            log.debug("Creating MllpSocketReader and MllpSocketWriter");
+            mllpSocketReader = new MllpSocketReader(socket, endpoint.receiveTimeout, endpoint.readTimeout, true);
+            if (endpoint.bufferWrites) {
+                mllpSocketWriter = new MllpBufferedSocketWriter(socket, false);
+            } else {
+                mllpSocketWriter = new MllpSocketWriter(socket, false);
             }
         }
-
-        return null;
     }
 
 }