You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/12/16 08:08:17 UTC

[6/8] camel git commit: CAMEL-10511: Updated MllpTcpClientProducer and MllpTcpServerConsumer to consume all available data on socket

CAMEL-10511:  Updated MllpTcpClientProducer and MllpTcpServerConsumer to consume all available data on socket


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e6d58b67
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e6d58b67
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e6d58b67

Branch: refs/heads/camel-2.18.x
Commit: e6d58b67ccfdf024854234cfd9a1a02cde149772
Parents: 0ed067b
Author: Quinn Stevenson <qu...@pronoia-solutions.com>
Authored: Tue Dec 6 12:23:03 2016 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Dec 16 08:58:13 2016 +0100

----------------------------------------------------------------------
 .../MllpAcknowledgementDeliveryException.java   |   8 +
 .../mllp/MllpAcknowledgementException.java      |  42 +-
 .../MllpAcknowledgementTimeoutException.java    |  69 +++
 .../MllpAcknowledgementTimoutException.java     |  30 --
 ...pplicationErrorAcknowledgementException.java |   7 +-
 ...plicationRejectAcknowledgementException.java |   7 +-
 ...MllpCommitErrorAcknowledgementException.java |   7 +-
 ...llpCommitRejectAcknowledgementException.java |   7 +-
 .../camel/component/mllp/MllpComponent.java     |  38 ++
 .../camel/component/mllp/MllpConstants.java     |   1 +
 .../camel/component/mllp/MllpEndpoint.java      |  40 ++
 .../camel/component/mllp/MllpException.java     |  81 ++-
 .../component/mllp/MllpFrameException.java      |  39 +-
 .../mllp/MllpInvalidMessageException.java       |  30 ++
 .../MllpNegativeAcknowledgementException.java   |  32 ++
 .../MllpReceiveAcknowledgementException.java    |  57 ++
 .../component/mllp/MllpReceiveException.java    |  46 ++
 .../component/mllp/MllpTcpClientProducer.java   | 208 ++++----
 .../component/mllp/MllpTcpServerConsumer.java   | 318 ++++++++---
 .../component/mllp/MllpTimeoutException.java    |  50 +-
 .../component/mllp/MllpWriteException.java      |  37 +-
 .../AcknowledgmentSynchronizationAdapter.java   | 212 --------
 .../camel/component/mllp/impl/Hl7Util.java      |  90 ++++
 .../mllp/impl/MllpBufferedSocketWriter.java     | 123 +++++
 .../component/mllp/impl/MllpSocketReader.java   | 290 ++++++++++
 .../component/mllp/impl/MllpSocketUtil.java     | 230 ++++++++
 .../component/mllp/impl/MllpSocketWriter.java   | 143 +++++
 .../camel/component/mllp/impl/MllpUtil.java     | 390 --------------
 .../mllp/Hl7AcknowledgementGenerator.java       |   1 +
 .../mllp/MllpAcknowledgementExceptionTest.java  | 127 -----
 .../camel/component/mllp/MllpExceptionTest.java | 114 ++++
 .../component/mllp/MllpFrameExceptionTest.java  | 101 ----
 .../mllp/MllpProducerConsumerLoopbackTest.java  |  16 +-
 .../MllpTcpClientConsumerBlueprintTest.java     |  36 --
 ...llpTcpClientProducerAcknowledgementTest.java | 207 ++++++-
 ...ntProducerAcknowledgementValidationTest.java | 283 ++++++++++
 .../MllpTcpClientProducerBlueprintTest.java     |  14 +-
 ...llpTcpClientProducerConnectionErrorTest.java | 165 ++++++
 .../mllp/MllpTcpClientProducerTest.java         |  82 ++-
 ...llpTcpServerConsumerAcknowledgementTest.java |   5 +-
 .../MllpTcpServerConsumerConnectionTest.java    |  35 +-
 ...pTcpServerConsumerMessageValidationTest.java | 317 +++++++++++
 .../mllp/MllpTcpServerConsumerTest.java         | 126 ++++-
 .../MllpTcpServerProducerBlueprintTest.java     |  35 --
 .../mllp/MllpTimeoutExceptionTest.java          | 101 ----
 .../component/mllp/MllpWriteExceptionTest.java  | 101 ----
 .../camel/component/mllp/impl/Hl7UtilTest.java  | 126 +++++
 ...BufferedSocketAcknowledgementWriterTest.java | 125 +++++
 .../MllpBufferedSocketMessageWriterTest.java    | 126 +++++
 .../MllpSocketAcknowledgementReaderTest.java    | 533 +++++++++++++++++++
 .../MllpSocketAcknowledgementWriterTest.java    | 150 ++++++
 .../mllp/impl/MllpSocketMessageReaderTest.java  | 527 ++++++++++++++++++
 .../mllp/impl/MllpSocketMessageWriterTest.java  | 150 ++++++
 .../mllp/impl/MllpSocketReaderTestSupport.java  | 342 ++++++++++++
 .../MllpSocketUtilExceptionHandlingTest.java    | 133 +++++
 .../impl/MllpSocketUtilFindXxxOfBlockTest.java  | 241 +++++++++
 .../mllp/impl/MllpSocketUtilSocketTest.java     | 288 ++++++++++
 .../mllp/impl/MllpSocketWriterTestSupport.java  | 132 +++++
 .../junit/rule/mllp/MllpServerResource.java     | 497 +++++++++++------
 .../apache/camel/test/util/PayloadBuilder.java  | 211 ++++++++
 .../src/test/resources/log4j2.properties        |  10 +-
 61 files changed, 6422 insertions(+), 1667 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java
index 4ac46c9..c6fc878 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java
@@ -29,4 +29,12 @@ public class MllpAcknowledgementDeliveryException extends MllpAcknowledgementExc
     public MllpAcknowledgementDeliveryException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
         super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
     }
+
+    public MllpAcknowledgementDeliveryException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message, hl7Message, hl7Acknowledgement);
+    }
+
+    public MllpAcknowledgementDeliveryException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, hl7Acknowledgement, cause);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
index 992d506..296430b 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
@@ -20,49 +20,25 @@ package org.apache.camel.component.mllp;
  * Base class for HL7 Application Acknowledgement Exceptions
  */
 public abstract class MllpAcknowledgementException extends MllpException {
-    private final byte[] hl7Message;
-    private final byte[] hl7Acknowledgement;
 
-    public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+    public MllpAcknowledgementException(String message) {
         super(message);
-        this.hl7Message = hl7Message;
-        this.hl7Acknowledgement = hl7Acknowledgement;
     }
 
-    public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
-        super(message, cause);
-        this.hl7Message = hl7Message;
-        this.hl7Acknowledgement = hl7Acknowledgement;
+    public MllpAcknowledgementException(String message, byte[] hl7Message) {
+        super(message, hl7Message);
     }
 
-    public byte[] getHl7Message() {
-        return hl7Message;
+    public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message, hl7Message, hl7Acknowledgement);
     }
 
-    public byte[] getHl7Acknowledgement() {
-        return hl7Acknowledgement;
+    public MllpAcknowledgementException(String message, byte[] hl7Message, Throwable cause) {
+        super(message, hl7Message, cause);
     }
 
-    @Override
-    public String getMessage() {
-        if (isLogPhi()) {
-            return String.format("%s:\n\tHL7 Message: %s\n\tHL7 Acknowledgement: %s",
-                    super.getMessage(), covertBytesToPrintFriendlyString(hl7Message), covertBytesToPrintFriendlyString(hl7Acknowledgement));
-        } else {
-            return super.getMessage();
-        }
+    public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, hl7Acknowledgement, cause);
     }
 
-    @Override
-    public String toString() {
-        StringBuilder stringBuilder = new StringBuilder(this.getClass().getName());
-
-        stringBuilder.append(": {hl7Message=")
-                .append(covertBytesToPrintFriendlyString(hl7Message))
-                .append(", hl7Acknowledgement=")
-                .append(covertBytesToPrintFriendlyString(hl7Acknowledgement))
-                .append("}");
-
-        return stringBuilder.toString();
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java
new file mode 100644
index 0000000..8cb2822
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Producer does not receive a HL7 acknowledgement within the configured timespan
+ */
+public class MllpAcknowledgementTimeoutException extends MllpAcknowledgementException {
+    static final String EXCEPTION_MESSAGE = "Timeout receiving HL7 Acknowledgement";
+
+    public MllpAcknowledgementTimeoutException(byte[] hl7Message) {
+        super(EXCEPTION_MESSAGE, hl7Message);
+    }
+
+    public MllpAcknowledgementTimeoutException(byte[] hl7Message, byte[] partialHl7Acknowledgement) {
+        super(EXCEPTION_MESSAGE, hl7Message, partialHl7Acknowledgement);
+    }
+
+    public MllpAcknowledgementTimeoutException(byte[] hl7Message, Throwable cause) {
+        super(EXCEPTION_MESSAGE, hl7Message, cause);
+    }
+
+    public MllpAcknowledgementTimeoutException(byte[] hl7Message, byte[] partialHl7Acknowledgement, Throwable cause) {
+        super(EXCEPTION_MESSAGE, hl7Message, partialHl7Acknowledgement, cause);
+    }
+
+    public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message) {
+        super(message, hl7Message);
+    }
+
+    public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, byte[] partialHl7Acknowledgement) {
+        super(message, hl7Message, partialHl7Acknowledgement);
+    }
+
+    public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, Throwable cause) {
+        super(message, hl7Message, cause);
+    }
+
+    public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, byte[] partialHl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, partialHl7Acknowledgement, cause);
+    }
+
+    /**
+     * Get the HL7 acknowledgement payload associated with this exception, if any.
+     *
+     * @return If the timeout occurred while attempting to receive an HL7 Message, this will be null.  If the timeout
+     * occurred while attempting to receive an HL7 Acknowledgement, this will be the HL7 Message.  If the timeout occurred
+     * while attempting to complete the read of an HL7 message (i.e. part of the message has already been read), this
+     * will be the partial acknowledgement payload that was read before the timeout.
+     */
+    public byte[] getHl7Acknowledgement() {
+        return super.getHl7Acknowledgement();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java
deleted file mode 100644
index a1cf147..0000000
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mllp;
-
-/**
- * Raised when a MLLP Producer does not receive a HL7 acknowledgement within the configured timespan
- */
-public class MllpAcknowledgementTimoutException extends MllpTimeoutException {
-    public MllpAcknowledgementTimoutException(String message, byte[] hl7Message) {
-        super(message, hl7Message);
-    }
-
-    public MllpAcknowledgementTimoutException(String message, byte[] hl7Message, Throwable cause) {
-        super(message, hl7Message, cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
index 5b6fed1..c8db47a 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.mllp;
 /**
  * Raised when a MLLP Producer receives a HL7 Application Error Acknowledgement
  */
-public class MllpApplicationErrorAcknowledgementException extends MllpAcknowledgementException {
+public class MllpApplicationErrorAcknowledgementException extends MllpNegativeAcknowledgementException {
     static final String EXCEPTION_MESSAGE = "HL7 Application Error Acknowledgment Received";
 
     public MllpApplicationErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
@@ -29,4 +29,9 @@ public class MllpApplicationErrorAcknowledgementException extends MllpAcknowledg
     public MllpApplicationErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
         super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
     }
+
+    @Override
+    public String getAcknowledgmentType() {
+        return "AE";
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
index 76cbf2c..00dd3e4 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.mllp;
 /**
  * Raised when a MLLP Producer receives a HL7 Application Reject Acknowledgement
  */
-public class MllpApplicationRejectAcknowledgementException extends MllpAcknowledgementException {
+public class MllpApplicationRejectAcknowledgementException extends MllpNegativeAcknowledgementException {
     static final String EXCEPTION_MESSAGE = "HL7 Application Reject Acknowledgment Received";
 
     public MllpApplicationRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
@@ -29,4 +29,9 @@ public class MllpApplicationRejectAcknowledgementException extends MllpAcknowled
     public MllpApplicationRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
         super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
     }
+
+    @Override
+    public String getAcknowledgmentType() {
+        return "AR";
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
index aef1a27..bec34d9 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.mllp;
 /**
  * Raised when a MLLP Producer receives a HL7 Commit Error Acknowledgement
  */
-public class MllpCommitErrorAcknowledgementException extends MllpAcknowledgementException {
+public class MllpCommitErrorAcknowledgementException extends MllpNegativeAcknowledgementException {
     static final String EXCEPTION_MESSAGE = "HL7 Commit Error Acknowledgment Received";
 
     public MllpCommitErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
@@ -29,4 +29,9 @@ public class MllpCommitErrorAcknowledgementException extends MllpAcknowledgement
     public MllpCommitErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
         super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
     }
+
+    @Override
+    public String getAcknowledgmentType() {
+        return "CE";
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
index 3907694..f98ee5e 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.mllp;
 /**
  * Raised when a MLLP Producer receives a HL7 Commit Reject Acknowledgement
  */
-public class MllpCommitRejectAcknowledgementException extends MllpAcknowledgementException {
+public class MllpCommitRejectAcknowledgementException extends MllpNegativeAcknowledgementException {
     static final String EXCEPTION_MESSAGE = "HL7 Commit Reject Acknowledgment Received";
 
     public MllpCommitRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
@@ -29,4 +29,9 @@ public class MllpCommitRejectAcknowledgementException extends MllpAcknowledgemen
     public MllpCommitRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
         super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
     }
+
+    @Override
+    public String getAcknowledgmentType() {
+        return "CR";
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
index b23a421..f3e22b1 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
@@ -22,6 +22,10 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.UriEndpointComponent;
 
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+
 /**
  * Represents the component that manages {@link MllpEndpoint}.
  */
@@ -64,4 +68,38 @@ public class MllpComponent extends UriEndpointComponent {
         return endpoint;
     }
 
+    public static boolean isLogPhi() {
+        String logPhiProperty = System.getProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
+        return Boolean.valueOf(logPhiProperty);
+    }
+
+    public static String covertToPrintFriendlyString(String hl7Message) {
+        if (hl7Message == null) {
+            return "null";
+        } else if (hl7Message.isEmpty()) {
+            return "empty";
+        }
+
+        return hl7Message.replaceAll("" + START_OF_BLOCK, "<VT>").replaceAll("" + END_OF_BLOCK, "<FS>").replaceAll("\r", "<CR>").replaceAll("\n", "<LF>");
+    }
+
+    public static String covertBytesToPrintFriendlyString(byte[] hl7Bytes) {
+        if (hl7Bytes == null) {
+            return "null";
+        } else if (hl7Bytes.length == 0) {
+            return "";
+        }
+
+        return covertBytesToPrintFriendlyString(hl7Bytes, 0, hl7Bytes.length);
+    }
+
+    public static String covertBytesToPrintFriendlyString(byte[] hl7Bytes, int startPosition, int length) {
+        if (null == hl7Bytes) {
+            return "null";
+        } else if (hl7Bytes.length == 0) {
+            return "";
+        }
+        return covertToPrintFriendlyString(new String(hl7Bytes, startPosition, length));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
index 8275a0e..4e14da0 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
@@ -21,6 +21,7 @@ public final class MllpConstants {
     public static final String MLLP_REMOTE_ADDRESS = "CamelMllpRemoteAddress";
 
     public static final String MLLP_ACKNOWLEDGEMENT = "CamelMllpAcknowledgement";
+    public static final String MLLP_ACKNOWLEDGEMENT_STRING = "CamelMllpAcknowledgementString";
     public static final String MLLP_ACKNOWLEDGEMENT_TYPE = "CamelMllpAcknowledgementType";
 
     public static final String MLLP_ACKNOWLEDGEMENT_EXCEPTION = "CamelMllpAcknowledgementException";

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index ae2e01f..3732254 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -100,6 +100,12 @@ public class MllpEndpoint extends DefaultEndpoint {
     @UriParam(defaultValue = "true")
     boolean hl7Headers = true;
 
+    @UriParam(defaultValue = "true")
+    boolean bufferWrites = true;
+
+    @UriParam(defaultValue = "false")
+    boolean validatePayload;
+
     @UriParam(label = "codec")
     String charsetName;
 
@@ -391,4 +397,38 @@ public class MllpEndpoint extends DefaultEndpoint {
     public void setHl7Headers(boolean hl7Headers) {
         this.hl7Headers = hl7Headers;
     }
+
+    public boolean isValidatePayload() {
+        return validatePayload;
+    }
+
+    /**
+     * Enable/Disable the validation of HL7 Payloads
+     *
+     * If enabled, HL7 Payloads received from external systems will be validated (see Hl7Util.generateInvalidPayloadExceptionMessage for details on the validation).
+     * If and invalid payload is detected, a MllpInvalidMessageException (for consumers) or a MllpInvalidAcknowledgementException will be thrown.
+     *
+     * @param validatePayload enabled if true, otherwise disabled
+     */
+    public void setValidatePayload(boolean validatePayload) {
+        this.validatePayload = validatePayload;
+    }
+
+    public boolean isBufferWrites() {
+        return bufferWrites;
+    }
+
+    /**
+     * Enable/Disable the validation of HL7 Payloads
+     *
+     * If enabled, MLLP Payloads are buffered and written to the external system in a single write(byte[]) operation.
+     * If disabled, the MLLP payload will not be buffered, and three write operations will be used.  The first operation
+     * will write the MLLP start-of-block character {0x0b (ASCII VT)}, the second operation will write the HL7 payload, and the
+     * third operation will writh the MLLP end-of-block character and the MLLP end-of-data character {[0x1c, 0x0d] (ASCII [FS, CR])}.
+     *
+     * @param bufferWrites enabled if true, otherwise disabled
+     */
+    public void setBufferWrites(boolean bufferWrites) {
+        this.bufferWrites = bufferWrites;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
index f0d5e84..0d2c1ce 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
@@ -20,25 +20,88 @@ package org.apache.camel.component.mllp;
  * Base class for all MLLP Exceptions, and also used as a generic MLLP exception
  */
 public class MllpException extends Exception {
+    private final byte[] hl7Message;
+    private final byte[] hl7Acknowledgement;
+
     public MllpException(String message) {
         super(message);
+        this.hl7Message = null;
+        this.hl7Acknowledgement = null;
+    }
+
+    public MllpException(String message, byte[] hl7Message) {
+        super(message);
+        this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+        this.hl7Acknowledgement = null;
+    }
+
+    public MllpException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message);
+        this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+        this.hl7Acknowledgement = (hl7Acknowledgement != null && hl7Acknowledgement.length > 0) ? hl7Acknowledgement : null;
     }
 
     public MllpException(String message, Throwable cause) {
         super(message, cause);
+        this.hl7Message = null;
+        this.hl7Acknowledgement = null;
     }
 
-    public boolean isLogPhi() {
-        String logPhiProperty = System.getProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
-        return Boolean.valueOf(logPhiProperty);
+    public MllpException(String message, byte[] hl7Message, Throwable cause) {
+        super(message, cause);
+        this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+        this.hl7Acknowledgement = null;
     }
 
-    protected String covertBytesToPrintFriendlyString(byte[] hl7Bytes) {
-        if (null == hl7Bytes) {
-            return "null";
-        } else if (hl7Bytes.length == 0) {
-            return "";
+    public MllpException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, cause);
+        this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+        this.hl7Acknowledgement = (hl7Acknowledgement != null && hl7Acknowledgement.length > 0) ? hl7Acknowledgement : null;
+    }
+
+
+    /**
+     * Get the HL7 message payload associated with this exception, if any.
+     *
+     * @return HL7 message payload
+     */
+    public byte[] getHl7Message() {
+        return hl7Message;
+    }
+
+    /**
+     * Get the HL7 acknowledgement payload associated with this exception, if any.
+     *
+     * @return HL7 acknowledgement payload
+     */
+    public byte[] getHl7Acknowledgement() {
+        return hl7Acknowledgement;
+    }
+
+    /**
+     * Override the base version of this method, and include the HL7 Message and Acknowledgement, if any
+     *
+     * @return the detail message of this MLLP Exception
+     */
+    @Override
+    public String getMessage() {
+        if (MllpComponent.isLogPhi()) {
+            return String.format("%s \n\t{hl7Message= %s} \n\t{hl7Acknowledgement= %s}",
+                    super.getMessage(), MllpComponent.covertBytesToPrintFriendlyString(hl7Message), MllpComponent.covertBytesToPrintFriendlyString(hl7Acknowledgement));
+        } else {
+            return super.getMessage();
         }
-        return new String(hl7Bytes).replaceAll("\r", "<CR>").replaceAll("\n", "<LF>");
     }
+
+    /**
+     * Return the MLLP Payload that is most likely the cause of the Exception
+     *
+     * If the HL7 Acknowledgement is present, return it.  Otherwise, return the HL7 Message.
+     *
+     * @return the MLLP Payload with the framing error
+     */
+    public byte[] getMllpPayload() {
+        return (hl7Acknowledgement != null  &&  hl7Acknowledgement.length > 0) ? hl7Acknowledgement : hl7Message;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java
index 43b1281..7b7a3c4 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java
@@ -17,44 +17,25 @@
 package org.apache.camel.component.mllp;
 
 /**
+ * @deprecated - replaced by more specific exceptions.  MllpTimeoutException, MllpInvalidMessageException and MllpInvalidAcknowledgementException
  * Raised when a MLLP Producer or consumer encounters a corrupt MLLP Frame while attempting
- * to read or write a MLLP payload.
+ * to readEnvelopedPayload or writeEnvelopedMessage a MLLP payload.
  */
 public class MllpFrameException extends MllpException {
-    private final byte[] mllpPayload;
-
-    public MllpFrameException(String message, byte[] mllpPayload) {
-        super(message);
-        this.mllpPayload = mllpPayload;
-    }
-
-    public MllpFrameException(String message, byte[] mllpPayload, Throwable cause) {
-        super(message, cause);
-        this.mllpPayload = mllpPayload;
+    public MllpFrameException(String message, byte[] hl7Message) {
+        super(message, hl7Message);
     }
 
-    public byte[] getMllpPayload() {
-        return mllpPayload;
+    public MllpFrameException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message, hl7Message, hl7Acknowledgement);
     }
 
-    @Override
-    public String getMessage() {
-        if (isLogPhi()) {
-            return String.format("%s:\n\tMLLP Payload: %s", super.getMessage(), covertBytesToPrintFriendlyString(mllpPayload));
-        } else {
-            return super.getMessage();
-        }
+    public MllpFrameException(String message, byte[] hl7Message, Throwable cause) {
+        super(message, hl7Message, cause);
     }
 
-    @Override
-    public String toString() {
-        StringBuilder stringBuilder = new StringBuilder(this.getClass().getName());
-
-        stringBuilder.append(": {mllpPayload=")
-                .append(covertBytesToPrintFriendlyString(mllpPayload))
-                .append("}");
-
-        return stringBuilder.toString();
+    public MllpFrameException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, hl7Acknowledgement, cause);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java
new file mode 100644
index 0000000..9cde29a
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Consumer receives an invalid HL7 Message.
+ */
+public class MllpInvalidMessageException extends MllpException {
+    public MllpInvalidMessageException(String message, byte[] hl7Message) {
+        super(message, hl7Message);
+    }
+
+    public MllpInvalidMessageException(String message, byte[] hl7Message, Throwable cause) {
+        super(message, hl7Message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java
new file mode 100644
index 0000000..6b8a4f8
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Abstract base for all MLLP Negative Acknowledgements
+ */
+public abstract class MllpNegativeAcknowledgementException extends MllpAcknowledgementException {
+    public MllpNegativeAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message, hl7Message, hl7Acknowledgement);
+    }
+
+    public MllpNegativeAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, hl7Acknowledgement, cause);
+    }
+
+    public abstract String getAcknowledgmentType();
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java
new file mode 100644
index 0000000..2268137
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Consumer cannot deliver the MLLP Acknowledgement
+ */
+public class MllpReceiveAcknowledgementException extends MllpAcknowledgementException {
+    static final String EXCEPTION_MESSAGE = "HL7 Acknowledgment Receipt Failed";
+
+    public MllpReceiveAcknowledgementException(byte[] hl7Message) {
+        super(EXCEPTION_MESSAGE, hl7Message);
+    }
+
+
+    public MllpReceiveAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement);
+    }
+
+    public MllpReceiveAcknowledgementException(byte[] hl7Message, Throwable cause) {
+        super(EXCEPTION_MESSAGE, hl7Message, cause);
+    }
+
+
+    public MllpReceiveAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
+    }
+
+    public MllpReceiveAcknowledgementException(String message, byte[] hl7Message) {
+        super(message, hl7Message);
+    }
+
+    public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message, hl7Message, hl7Acknowledgement);
+    }
+
+    public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, Throwable cause) {
+        super(message, hl7Message, cause);
+    }
+    public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, hl7Acknowledgement, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java
new file mode 100644
index 0000000..c755d02
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Consumer cannot deliver the MLLP Acknowledgement
+ */
+public class MllpReceiveException extends MllpException {
+    public MllpReceiveException(String message) {
+        super(message);
+    }
+
+    public MllpReceiveException(String message, byte[] hl7Message) {
+        super(message, hl7Message);
+    }
+
+    public MllpReceiveException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+        super(message, hl7Message, hl7Acknowledgement);
+    }
+
+    public MllpReceiveException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MllpReceiveException(String message, byte[] hl7Message, Throwable cause) {
+        super(message, hl7Message, cause);
+    }
+
+    public MllpReceiveException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, hl7Acknowledgement, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
index 5793aea..69176bc 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
@@ -20,14 +20,19 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
-import java.net.SocketTimeoutException;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.apache.camel.component.mllp.impl.MllpUtil;
+import org.apache.camel.component.mllp.impl.Hl7Util;
+import org.apache.camel.component.mllp.impl.MllpBufferedSocketWriter;
+import org.apache.camel.component.mllp.impl.MllpSocketReader;
+import org.apache.camel.component.mllp.impl.MllpSocketUtil;
+import org.apache.camel.component.mllp.impl.MllpSocketWriter;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.IOHelper;
 
 import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT;
+import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING;
 import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE;
 import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_AFTER_SEND;
 import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_BEFORE_SEND;
@@ -45,6 +50,9 @@ public class MllpTcpClientProducer extends DefaultProducer {
 
     Socket socket;
 
+    MllpSocketReader mllpSocketReader;
+    MllpSocketWriter mllpSocketWriter;
+
     public MllpTcpClientProducer(MllpEndpoint endpoint) throws SocketException {
         super(endpoint);
         log.trace("MllpTcpClientProducer(endpoint)");
@@ -63,7 +71,7 @@ public class MllpTcpClientProducer extends DefaultProducer {
     protected void doStop() throws Exception {
         log.trace("doStop()");
 
-        MllpUtil.closeConnection(socket);
+        MllpSocketUtil.close(socket, log, "Stopping component");
 
         super.doStop();
     }
@@ -74,15 +82,18 @@ public class MllpTcpClientProducer extends DefaultProducer {
 
         // Check BEFORE_SEND Properties
         if (exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class)) {
-            MllpUtil.resetConnection(socket);
+            MllpSocketUtil.reset(socket, log, "Exchange property " + MLLP_RESET_CONNECTION_BEFORE_SEND + " = " + exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class));
             return;
         } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class)) {
-            MllpUtil.closeConnection(socket);
+            MllpSocketUtil.close(socket, log, "Exchange property " + MLLP_CLOSE_CONNECTION_BEFORE_SEND + " = " + exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class));
+            return;
         }
 
-        Exception connectionException = checkConnection();
-        if (null != connectionException) {
-            exchange.setException(connectionException);
+        // Establish a connection if needed
+        try {
+            checkConnection();
+        } catch (IOException ioEx) {
+            exchange.setException(ioEx);
             return;
         }
 
@@ -93,95 +104,118 @@ public class MllpTcpClientProducer extends DefaultProducer {
             message = exchange.getIn();
         }
 
+        message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString());
+        message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress().toString());
+
+        // Send the message to the external system
         byte[] hl7MessageBytes = message.getMandatoryBody(byte[].class);
+        byte[] acknowledgementBytes = null;
 
-        log.debug("Sending message to external system");
         try {
-            MllpUtil.writeFramedPayload(socket, hl7MessageBytes);
+            log.debug("Sending message to external system");
+            mllpSocketWriter.writeEnvelopedPayload(hl7MessageBytes, null);
+            log.debug("Reading acknowledgement from external system");
+            acknowledgementBytes = mllpSocketReader.readEnvelopedPayload(hl7MessageBytes);
+        } catch (MllpWriteException writeEx) {
+            MllpSocketUtil.reset(socket, log, writeEx.getMessage());
+            exchange.setException(writeEx);
+            return;
+        } catch (MllpReceiveException ackReceiveEx) {
+            MllpSocketUtil.reset(socket, log, ackReceiveEx.getMessage());
+            exchange.setException(ackReceiveEx);
+            return;
         } catch (MllpException mllpEx) {
+            Throwable mllpExCause = mllpEx.getCause();
+            if (mllpExCause != null && mllpExCause instanceof IOException) {
+                MllpSocketUtil.reset(socket, log, mllpEx.getMessage());
+            }
             exchange.setException(mllpEx);
             return;
         }
 
-        log.debug("Reading acknowledgement from external system");
-        byte[] acknowledgementBytes = null;
-        try {
-            if (MllpUtil.openFrame(socket, endpoint.receiveTimeout, endpoint.readTimeout)) {
-                acknowledgementBytes = MllpUtil.closeFrame(socket, endpoint.receiveTimeout, endpoint.readTimeout);
+        log.debug("Populating message headers with the acknowledgement from the external system");
+        message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
+        message.setHeader(MLLP_ACKNOWLEDGEMENT_STRING, new String(acknowledgementBytes, IOHelper.getCharsetName(exchange, true)));
+
+        if (endpoint.validatePayload) {
+            String exceptionMessage = Hl7Util.generateInvalidPayloadExceptionMessage(acknowledgementBytes);
+            if (exceptionMessage != null) {
+                exchange.setException(new MllpInvalidAcknowledgementException(exceptionMessage, hl7MessageBytes, acknowledgementBytes));
+                return;
             }
-        } catch (SocketTimeoutException timeoutEx) {
-            exchange.setException(new MllpAcknowledgementTimoutException("Acknowledgement timout", hl7MessageBytes, timeoutEx));
-            return;
+        }
+
+        log.debug("Processing the acknowledgement from the external system");
+        try {
+            String acknowledgementType = processAcknowledgment(hl7MessageBytes, acknowledgementBytes);
+            message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, acknowledgementType);
         } catch (MllpException mllpEx) {
             exchange.setException(mllpEx);
             return;
         }
 
-        if (null != acknowledgementBytes) {
-            log.debug("Populating the exchange with the acknowledgement from the external system");
-            message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
+        // Check AFTER_SEND Properties
+        if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) {
+            MllpSocketUtil.reset(socket, log, "Exchange property " + MLLP_RESET_CONNECTION_AFTER_SEND + " = " + exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class));
+        } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) {
+            MllpSocketUtil.close(socket, log, "Exchange property " + MLLP_CLOSE_CONNECTION_AFTER_SEND + " = " + exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class));
+        }
+    }
 
-            message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString());
-            message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress());
+    private String processAcknowledgment(byte[] hl7MessageBytes, byte[] hl7AcknowledgementBytes) throws MllpException {
+        String acknowledgementType = "";
 
-            // Now, extract the acknowledgement type and check for a NACK
-            byte fieldDelim = acknowledgementBytes[3];
+        if (hl7AcknowledgementBytes != null && hl7AcknowledgementBytes.length > 3) {
+            // Extract the acknowledgement type and check for a NACK
+            byte fieldDelim = hl7AcknowledgementBytes[3];
             // First, find the beginning of the MSA segment - should be the second segment
             int msaStartIndex = -1;
-            for (int i = 0; i < acknowledgementBytes.length; ++i) {
-                if (SEGMENT_DELIMITER == acknowledgementBytes[i]) {
+            for (int i = 0; i < hl7AcknowledgementBytes.length; ++i) {
+                if (SEGMENT_DELIMITER == hl7AcknowledgementBytes[i]) {
                     final byte bM = 77;
                     final byte bS = 83;
                     final byte bC = 67;
                     final byte bA = 65;
                     final byte bE = 69;
                     final byte bR = 82;
-                        /* We've found the start of a new segment - make sure peeking ahead
-                           won't run off the end of the array - we need at least 7 more bytes
-                         */
-                    if (acknowledgementBytes.length > i + 7) {
+                    /* We've found the start of a new segment - make sure peeking ahead
+                       won't run off the end of the array - we need at least 7 more bytes
+                     */
+                    if (hl7AcknowledgementBytes.length > i + 7) {
                         // We can safely peek ahead
-                        if (bM == acknowledgementBytes[i + 1] && bS == acknowledgementBytes[i + 2] && bA == acknowledgementBytes[i + 3] && fieldDelim == acknowledgementBytes[i + 4]) {
+                        if (bM == hl7AcknowledgementBytes[i + 1] && bS == hl7AcknowledgementBytes[i + 2] && bA == hl7AcknowledgementBytes[i + 3] && fieldDelim == hl7AcknowledgementBytes[i + 4]) {
                             // Found the beginning of the MSA - the next two bytes should be our acknowledgement code
                             msaStartIndex = i + 1;
-                            if (bA != acknowledgementBytes[i + 5] && bC != acknowledgementBytes[i + 5]) {
-                                exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes, i + 5, 2), hl7MessageBytes, acknowledgementBytes));
+                            if (bA != hl7AcknowledgementBytes[i + 5] && bC != hl7AcknowledgementBytes[i + 5]) {
+                                String errorMessage = "Unsupported acknowledgement type: " + new String(hl7AcknowledgementBytes, i + 5, 2);
+                                throw new MllpInvalidAcknowledgementException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes);
                             } else {
-                                String acknowledgemenTypeString;
-                                switch (acknowledgementBytes[i + 6]) {
+                                switch (hl7AcknowledgementBytes[i + 6]) {
                                 case bA:
-                                    // We have an AA or CA- make sure that's the end of the field
-                                    if (fieldDelim != acknowledgementBytes[i + 7]) {
-                                        exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes, i + 5, 3), hl7MessageBytes, acknowledgementBytes));
-                                    }
-                                    if (bA == acknowledgementBytes[i + 5]) {
-                                        message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+                                    // We have an AA or CA
+                                    if (bA == hl7AcknowledgementBytes[i + 5]) {
+                                        acknowledgementType = "AA";
                                     } else {
-                                        message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CA");
+                                        acknowledgementType = "CA";
                                     }
                                     break;
                                 case bE:
                                     // We have an AE or CE
-                                    if (bA == acknowledgementBytes[i + 5]) {
-                                        message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AE");
-                                        exchange.setException(new MllpApplicationErrorAcknowledgementException(hl7MessageBytes, acknowledgementBytes));
+                                    if (bA == hl7AcknowledgementBytes[i + 5]) {
+                                        throw new MllpApplicationErrorAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
                                     } else {
-                                        message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CE");
-                                        exchange.setException(new MllpCommitErrorAcknowledgementException(hl7MessageBytes, acknowledgementBytes));
+                                        throw new MllpCommitErrorAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
                                     }
-                                    break;
                                 case bR:
                                     // We have an AR or CR
-                                    if (bA == acknowledgementBytes[i + 5]) {
-                                        message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AR");
-                                        exchange.setException(new MllpApplicationRejectAcknowledgementException(hl7MessageBytes, acknowledgementBytes));
+                                    if (bA == hl7AcknowledgementBytes[i + 5]) {
+                                        throw new MllpApplicationRejectAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
                                     } else {
-                                        message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CR");
-                                        exchange.setException(new MllpCommitRejectAcknowledgementException(hl7MessageBytes, acknowledgementBytes));
+                                        throw new MllpCommitRejectAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
                                     }
-                                    break;
                                 default:
-                                    exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes, i + 5, 2), hl7MessageBytes, acknowledgementBytes));
+                                    String errorMessage = "Unsupported acknowledgement type: " + new String(hl7AcknowledgementBytes, i + 5, 2);
+                                    throw new MllpInvalidAcknowledgementException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes);
                                 }
                             }
 
@@ -191,18 +225,13 @@ public class MllpTcpClientProducer extends DefaultProducer {
                 }
 
             }
-            if (-1 == msaStartIndex) {
+            if (-1 == msaStartIndex  &&  endpoint.validatePayload) {
                 // Didn't find an MSA
-                exchange.setException(new MllpInvalidAcknowledgementException("MSA Not found in acknowledgement", hl7MessageBytes, acknowledgementBytes));
+                throw new MllpInvalidAcknowledgementException("MSA Not found in acknowledgement", hl7MessageBytes, hl7AcknowledgementBytes);
             }
         }
-        // Check AFTER_SEND Properties
-        if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) {
-            MllpUtil.resetConnection(socket);
-            return;
-        } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) {
-            MllpUtil.closeConnection(socket);
-        }
+
+        return acknowledgementType;
     }
 
     /**
@@ -210,28 +239,24 @@ public class MllpTcpClientProducer extends DefaultProducer {
      *
      * @return null if the connection is valid, otherwise the Exception encounted checking the connection
      */
-    Exception checkConnection() {
+    void checkConnection() throws IOException {
         if (null == socket || socket.isClosed() || !socket.isConnected()) {
             socket = new Socket();
 
-            try {
-                socket.setKeepAlive(endpoint.keepAlive);
-                socket.setTcpNoDelay(endpoint.tcpNoDelay);
-                if (null != endpoint.receiveBufferSize) {
-                    socket.setReceiveBufferSize(endpoint.receiveBufferSize);
-                }
-                if (null != endpoint.sendBufferSize) {
-                    socket.setSendBufferSize(endpoint.sendBufferSize);
-                }
-                socket.setReuseAddress(endpoint.reuseAddress);
-                socket.setSoLinger(false, -1);
-
-                // Read Timeout
-                socket.setSoTimeout(endpoint.receiveTimeout);
-            } catch (SocketException e) {
-                return e;
+            socket.setKeepAlive(endpoint.keepAlive);
+            socket.setTcpNoDelay(endpoint.tcpNoDelay);
+            if (null != endpoint.receiveBufferSize) {
+                socket.setReceiveBufferSize(endpoint.receiveBufferSize);
+            } else {
+                endpoint.receiveBufferSize = socket.getReceiveBufferSize();
             }
-
+            if (null != endpoint.sendBufferSize) {
+                socket.setSendBufferSize(endpoint.sendBufferSize);
+            } else {
+                endpoint.sendBufferSize = socket.getSendBufferSize();
+            }
+            socket.setReuseAddress(endpoint.reuseAddress);
+            socket.setSoLinger(false, -1);
 
             InetSocketAddress socketAddress;
             if (null == endpoint.getHostname()) {
@@ -239,17 +264,18 @@ public class MllpTcpClientProducer extends DefaultProducer {
             } else {
                 socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort());
             }
+
             log.debug("Connecting to socket on {}", socketAddress);
-            try {
-                socket.connect(socketAddress, endpoint.connectTimeout);
-            } catch (SocketTimeoutException e) {
-                return e;
-            } catch (IOException e) {
-                return e;
+            socket.connect(socketAddress, endpoint.connectTimeout);
+
+            log.debug("Creating MllpSocketReader and MllpSocketWriter");
+            mllpSocketReader = new MllpSocketReader(socket, endpoint.receiveTimeout, endpoint.readTimeout, true);
+            if (endpoint.bufferWrites) {
+                mllpSocketWriter = new MllpBufferedSocketWriter(socket, false);
+            } else {
+                mllpSocketWriter = new MllpSocketWriter(socket, false);
             }
         }
-
-        return null;
     }
 
 }