You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by qu...@apache.org on 2018/03/03 16:48:54 UTC

[camel] branch master updated: CAMEL-12315 - correct auto-acknowledgment issues

This is an automated email from the ASF dual-hosted git repository.

quinn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b19e60  CAMEL-12315 - correct auto-acknowledgment issues
9b19e60 is described below

commit 9b19e60dbe42df472aefa45e10fbfd1f3436a43e
Author: Quinn Stevenson <qu...@apache.org>
AuthorDate: Sat Mar 3 09:47:50 2018 -0700

    CAMEL-12315 - correct auto-acknowledgment issues
---
 .../MllpAcknowledgementGenerationException.java}   |  24 ++---
 .../apache/camel/component/mllp/MllpConstants.java |   1 +
 .../component/mllp/MllpTcpServerConsumer.java      |  46 +++++---
 .../camel/component/mllp/internal/Hl7Util.java     |  51 ++++++---
 .../Hl7AcknowledgementGenerationException.java     |  49 +++++++--
 .../mllp/Hl7AcknowledgementGenerator.java          |   6 +-
 ...oAcknowledgementWithBridgeErrorHandlerTest.java | 118 +++++++++++++++++++++
 ...knowledgementWithoutBridgeErrorHandlerTest.java | 113 ++++++++++++++++++++
 ...lAcknowledgementWithBridgeErrorHandlerTest.java | 107 +++++++++++++++++++
 ...knowledgementWithoutBridgeErrorHandlerTest.java | 107 +++++++++++++++++++
 ...umerOptionalEndOfDataWithoutValidationTest.java |   2 -
 ...umerRequiredEndOfDataWithoutValidationTest.java |   4 -
 ...pServerConsumerAcknowledgementTestSupport.java} | 109 ++++++++++---------
 .../camel/component/mllp/internal/Hl7UtilTest.java |  54 ++++++----
 14 files changed, 657 insertions(+), 134 deletions(-)

diff --git a/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementGenerationException.java
similarity index 54%
copy from components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java
copy to components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementGenerationException.java
index 09d7bbb..d857b42 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementGenerationException.java
@@ -15,31 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.camel.processor.mllp;
+package org.apache.camel.component.mllp;
 
 /*
- * Exception thrown by the HL7AcknowledgmentGenerator in the event of a failure.
+ * Exception thrown by MLLP Consumer if autoAck is set to true and an acknowledgement cannot be generated.
  */
-public class Hl7AcknowledgementGenerationException extends Exception {
-    private final byte[] hl7Message;
+public class MllpAcknowledgementGenerationException extends MllpException {
 
-    public Hl7AcknowledgementGenerationException(String message) {
+    public MllpAcknowledgementGenerationException(String message) {
         super(message);
-        this.hl7Message = null;
     }
 
-    public Hl7AcknowledgementGenerationException(String message, byte[] hl7Message) {
-        super(message);
-        this.hl7Message = hl7Message;
+    public MllpAcknowledgementGenerationException(String message, byte[] hl7MessageBytes) {
+        super(message, hl7MessageBytes);
     }
 
-    public Hl7AcknowledgementGenerationException(String message, byte[] hl7Message, Throwable cause) {
-        super(message, cause);
-        this.hl7Message = hl7Message;
+    public MllpAcknowledgementGenerationException(String message, byte[] hl7MessageBytes, Throwable cause) {
+        super(message, hl7MessageBytes, cause);
     }
 
-
-    public byte[] getHl7Message() {
-        return hl7Message;
-    }
 }
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
index a2d9ae7..781a9ec 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
@@ -27,6 +27,7 @@ public final class MllpConstants {
     public static final String MLLP_ACKNOWLEDGEMENT = "CamelMllpAcknowledgement";
     public static final String MLLP_ACKNOWLEDGEMENT_STRING = "CamelMllpAcknowledgementString";
     public static final String MLLP_ACKNOWLEDGEMENT_TYPE = "CamelMllpAcknowledgementType";
+    public static final String MLLP_ACKNOWLEDGEMENT_MSA_TEXT = "CamelMllpAcknowledgementMsaText";
 
     public static final String MLLP_ACKNOWLEDGEMENT_EXCEPTION = "CamelMllpAcknowledgementException";
     public static final String MLLP_AUTO_ACKNOWLEDGE = "CamelMllpAutoAcknowledge";
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index c1ee5dc..f2fb10a 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -230,6 +230,11 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
         }
         try {
             createUoW(exchange);
+
+            if (exchange.hasProperties() || exchange.getProperty(MllpConstants.MLLP_AUTO_ACKNOWLEDGE) == null) {
+                exchange.setProperty(MllpConstants.MLLP_AUTO_ACKNOWLEDGE, getConfiguration().isAutoAck());
+            }
+
             Message message = exchange.getIn();
 
             if (consumerRunnable.hasLocalAddress()) {
@@ -240,10 +245,6 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
                 message.setHeader(MllpConstants.MLLP_REMOTE_ADDRESS, consumerRunnable.getRemoteAddress());
             }
 
-            if (message.hasHeaders() && message.getHeader(MllpConstants.MLLP_AUTO_ACKNOWLEDGE) == null) {
-                message.setHeader(MllpConstants.MLLP_AUTO_ACKNOWLEDGE, getConfiguration().isAutoAck());
-            }
-
             if (getConfiguration().isValidatePayload()) {
                 String exceptionMessage = Hl7Util.generateInvalidPayloadExceptionMessage(hl7MessageBytes);
                 if (exceptionMessage != null) {
@@ -399,24 +400,29 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
             if (!autoAck) {
                 Object acknowledgementBytesProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT);
                 Object acknowledgementStringProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING);
-                if (acknowledgementBytesProperty == null && acknowledgementStringProperty == null) {
-                    final String exceptionMessage = "Automatic Acknowledgement is disabled and the "
-                        + MllpConstants.MLLP_ACKNOWLEDGEMENT + " and " + MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING + " exchange properties are null";
-                    exchange.setException(new MllpInvalidAcknowledgementException(exceptionMessage, originalHl7MessageBytes, acknowledgementMessageBytes));
-                } else {
-                    final String exceptionMessage = "Automatic Acknowledgement is disabled and neither the "
-                        + MllpConstants.MLLP_ACKNOWLEDGEMENT + "(type = " + acknowledgementBytesProperty.getClass().getSimpleName() + ") nor  the"
+                final String exceptionMessage = (acknowledgementBytesProperty == null && acknowledgementStringProperty == null)
+                    ? "Automatic Acknowledgement is disabled and the "
+                        + MllpConstants.MLLP_ACKNOWLEDGEMENT + " and " + MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING + " exchange properties are null"
+                    : "Automatic Acknowledgement is disabled and neither the "
+                        + MllpConstants.MLLP_ACKNOWLEDGEMENT + "(type = " + acknowledgementBytesProperty.getClass().getSimpleName() + ") nor the"
                         + MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING + "(type = " + acknowledgementBytesProperty.getClass().getSimpleName() + ") exchange properties can be converted to byte[]";
-                    exchange.setException(new MllpInvalidAcknowledgementException(exceptionMessage, originalHl7MessageBytes, acknowledgementMessageBytes));
-                }
+                MllpInvalidAcknowledgementException invalidAckEx = new MllpInvalidAcknowledgementException(exceptionMessage, originalHl7MessageBytes, acknowledgementMessageBytes);
+                exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, invalidAckEx);
+                handleException(invalidAckEx);
             } else {
                 String acknowledgmentTypeProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, String.class);
+                String msa3 = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_MSA_TEXT, String.class);
+                Exception exchangeEx = exchange.getException();
+
                 try {
                     if (null == acknowledgmentTypeProperty) {
-                        if (null == exchange.getException()) {
+                        if (null == exchangeEx) {
                             acknowledgementMessageType = "AA";
                         } else {
                             acknowledgementMessageType = "AE";
+                            if (msa3 == null || msa3.isEmpty()) {
+                                msa3 = exchangeEx.getClass().getName();
+                            }
                         }
                     } else {
                         switch (acknowledgmentTypeProperty) {
@@ -425,9 +431,15 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
                             break;
                         case "AE":
                             acknowledgementMessageType = "AE";
+                            if (exchangeEx != null && msa3 != null && msa3.isEmpty()) {
+                                msa3 = exchangeEx.getClass().getName();
+                            }
                             break;
                         case "AR":
                             acknowledgementMessageType = "AR";
+                            if (exchangeEx != null && msa3 != null && msa3.isEmpty()) {
+                                msa3 = exchangeEx.getClass().getName();
+                            }
                             break;
                         default:
                             exchange.setException(new Hl7AcknowledgementGenerationException("Unsupported acknowledgment type: " + acknowledgmentTypeProperty));
@@ -435,11 +447,11 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
                         }
                     }
 
-                    Hl7Util.generateAcknowledgementPayload(consumerRunnable.getMllpBuffer(), originalHl7MessageBytes, acknowledgementMessageType);
+                    Hl7Util.generateAcknowledgementPayload(consumerRunnable.getMllpBuffer(), originalHl7MessageBytes, acknowledgementMessageType, msa3);
 
-                } catch (Hl7AcknowledgementGenerationException ackGenerationException) {
+                } catch (MllpAcknowledgementGenerationException ackGenerationException) {
                     exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, ackGenerationException);
-                    exchange.setException(ackGenerationException);
+                    handleException(ackGenerationException);
                 }
             }
         } else {
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/Hl7Util.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/Hl7Util.java
index 432ae5c..3766290 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/Hl7Util.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/Hl7Util.java
@@ -17,14 +17,14 @@
 
 package org.apache.camel.component.mllp.internal;
 
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.camel.component.mllp.MllpAcknowledgementGenerationException;
 import org.apache.camel.component.mllp.MllpComponent;
 import org.apache.camel.component.mllp.MllpProtocolConstants;
 import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerationException;
@@ -37,6 +37,8 @@ public final class Hl7Util {
 
     public static final Map<Character, String> CHARACTER_REPLACEMENTS;
 
+    public static final SimpleDateFormat TIMESTAMP_FORMAT = new SimpleDateFormat("yyyyMMddHHmmssSSSZZZZ");
+
     static final int STRING_BUFFER_PAD_SIZE = 100;
 
     static final Logger LOG = LoggerFactory.getLogger(Hl7Util.class);
@@ -193,23 +195,31 @@ public final class Hl7Util {
     }
 
 
-    public static void generateAcknowledgementPayload(MllpSocketBuffer mllpSocketBuffer, byte[] hl7MessageBytes, String acknowledgementCode) throws Hl7AcknowledgementGenerationException {
+    public static void generateAcknowledgementPayload(MllpSocketBuffer mllpSocketBuffer, byte[] hl7MessageBytes, String acknowledgementCode)
+        throws MllpAcknowledgementGenerationException {
+        generateAcknowledgementPayload(mllpSocketBuffer, hl7MessageBytes, acknowledgementCode, null);
+    }
+
+    public static void generateAcknowledgementPayload(MllpSocketBuffer mllpSocketBuffer, byte[] hl7MessageBytes, String acknowledgementCode, String msa3)
+        throws MllpAcknowledgementGenerationException {
         if (hl7MessageBytes == null) {
-            throw new Hl7AcknowledgementGenerationException("Null HL7 message received for parsing operation");
+            throw new MllpAcknowledgementGenerationException("Null HL7 message received for parsing operation");
         }
 
         List<Integer> fieldSeparatorIndexes = findFieldSeparatorIndicesInSegment(hl7MessageBytes, 0);
 
         if (fieldSeparatorIndexes.isEmpty()) {
-            throw new Hl7AcknowledgementGenerationException("Failed to find the end of the MSH Segment while attempting to generate response", hl7MessageBytes);
+            throw new MllpAcknowledgementGenerationException("Failed to find the end of the MSH Segment while attempting to generate response", hl7MessageBytes);
         }
 
         if (fieldSeparatorIndexes.size() < 8) {
-            String exceptionMessage = String.format("Insufficient number of fields in MSH-2 in MSH to generate a response - 10 are required but %d were found", fieldSeparatorIndexes.size() - 1);
+            String exceptionMessage = String.format("Insufficient number of fields found in MSH to generate a response - 10 are required but %d were found", fieldSeparatorIndexes.size() - 1);
 
-            throw new Hl7AcknowledgementGenerationException(exceptionMessage, hl7MessageBytes);
+            throw new MllpAcknowledgementGenerationException(exceptionMessage, hl7MessageBytes);
         }
 
+        final byte fieldSeparator = hl7MessageBytes[3];
+
         // Start building the MLLP Envelope
         mllpSocketBuffer.openMllpEnvelope();
 
@@ -219,10 +229,13 @@ public final class Hl7Util {
         writeFieldToBuffer(4, mllpSocketBuffer, hl7MessageBytes, fieldSeparatorIndexes); // MSH-6
         writeFieldToBuffer(1, mllpSocketBuffer, hl7MessageBytes, fieldSeparatorIndexes); // MSH-3
         writeFieldToBuffer(2, mllpSocketBuffer, hl7MessageBytes, fieldSeparatorIndexes); // MSH-4
-        writeFieldToBuffer(5, mllpSocketBuffer, hl7MessageBytes, fieldSeparatorIndexes); // MSH-7
-        writeFieldToBuffer(6, mllpSocketBuffer, hl7MessageBytes, fieldSeparatorIndexes); // MSH-8
 
-        final byte fieldSeparator = hl7MessageBytes[3];
+        // MSH-7
+        mllpSocketBuffer.write(fieldSeparator);
+        mllpSocketBuffer.write(TIMESTAMP_FORMAT.format(new Date()).getBytes());
+
+        // Don't copy MSH-8
+        mllpSocketBuffer.write(fieldSeparator);
 
         // Need to generate the correct MSH-9
         mllpSocketBuffer.write(fieldSeparator);
@@ -243,8 +256,14 @@ public final class Hl7Util {
             mllpSocketBuffer.write(hl7MessageBytes, msh92start, fieldSeparatorIndexes.get(8) - msh92start);
         }
 
+        // MSH-10 - use the original control ID, but add an "A" as a suffix
+        writeFieldToBuffer(8, mllpSocketBuffer, hl7MessageBytes, fieldSeparatorIndexes);
+        if (fieldSeparatorIndexes.get(9) - fieldSeparatorIndexes.get(8) > 1) {
+            mllpSocketBuffer.write('A');
+        }
+
         // MSH-10 through the end of the MSH
-        mllpSocketBuffer.write(hl7MessageBytes, fieldSeparatorIndexes.get(8), fieldSeparatorIndexes.get(fieldSeparatorIndexes.size() - 1) - fieldSeparatorIndexes.get(8));
+        mllpSocketBuffer.write(hl7MessageBytes, fieldSeparatorIndexes.get(9), fieldSeparatorIndexes.get(fieldSeparatorIndexes.size() - 1) - fieldSeparatorIndexes.get(9));
 
         mllpSocketBuffer.write(MllpProtocolConstants.SEGMENT_DELIMITER);
 
@@ -252,7 +271,13 @@ public final class Hl7Util {
         mllpSocketBuffer.write("MSA".getBytes(), 0, 3);
         mllpSocketBuffer.write(fieldSeparator);
         mllpSocketBuffer.write(acknowledgementCode.getBytes(), 0, 2);
-        writeFieldToBuffer(8, mllpSocketBuffer, hl7MessageBytes, fieldSeparatorIndexes); // MSH-10
+        writeFieldToBuffer(8, mllpSocketBuffer, hl7MessageBytes, fieldSeparatorIndexes); // MSH-10 => MSA-2
+
+        if (msa3 != null && !msa3.isEmpty()) {
+            mllpSocketBuffer.write(fieldSeparator);
+            mllpSocketBuffer.write(msa3.getBytes());
+        }
+
         mllpSocketBuffer.write(MllpProtocolConstants.SEGMENT_DELIMITER);
 
         // Close the MLLP Envelope
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java b/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java
index 09d7bbb..2806218 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerationException.java
@@ -17,29 +17,62 @@
 
 package org.apache.camel.processor.mllp;
 
+import org.apache.camel.component.mllp.internal.Hl7Util;
+
+
 /*
  * Exception thrown by the HL7AcknowledgmentGenerator in the event of a failure.
  */
 public class Hl7AcknowledgementGenerationException extends Exception {
-    private final byte[] hl7Message;
+    private final byte[] hl7MessageBytes;
 
     public Hl7AcknowledgementGenerationException(String message) {
         super(message);
-        this.hl7Message = null;
+        this.hl7MessageBytes = null;
     }
 
-    public Hl7AcknowledgementGenerationException(String message, byte[] hl7Message) {
+    public Hl7AcknowledgementGenerationException(String message, byte[] hl7MessageBytes) {
         super(message);
-        this.hl7Message = hl7Message;
+        this.hl7MessageBytes = hl7MessageBytes;
     }
 
-    public Hl7AcknowledgementGenerationException(String message, byte[] hl7Message, Throwable cause) {
+    public Hl7AcknowledgementGenerationException(String message, byte[] hl7MessageBytes, Throwable cause) {
         super(message, cause);
-        this.hl7Message = hl7Message;
+        this.hl7MessageBytes = hl7MessageBytes;
+    }
+
+
+    public boolean hasHl7MessageBytes() {
+        return hl7MessageBytes != null && hl7MessageBytes.length > 0;
+    }
+
+    public byte[] getHl7MessageBytes() {
+        return hl7MessageBytes;
     }
 
+    /**
+     * Override the base version of this method, and include the HL7 Message and Acknowledgement, if any
+     *
+     * @return the detail message of this MLLP Exception
+     */
+    @Override
+    public String getMessage() {
+        if (hasHl7MessageBytes()) {
+            String parentMessage = super.getMessage();
+
+            StringBuilder messageBuilder = new StringBuilder(parentMessage.length() + hl7MessageBytes.length);
+
+            messageBuilder.append(parentMessage).append("\n\t{hl7MessageBytes [")
+                .append(hl7MessageBytes.length)
+                .append("] = ");
+
+            Hl7Util.appendBytesAsPrintFriendlyString(messageBuilder, hl7MessageBytes, 0, hl7MessageBytes.length);
+
+            messageBuilder.append('}');
+
+            return messageBuilder.toString();
+        }
 
-    public byte[] getHl7Message() {
-        return hl7Message;
+        return super.getMessage();
     }
 }
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java b/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java
index 55aeee8..ea89307 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/processor/mllp/Hl7AcknowledgementGenerator.java
@@ -31,7 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A Camel Processor for generating HL7 Acknowledgements.
+ * An example of a Camel Processor for generating HL7 Acknowledgements.
  */
 public class Hl7AcknowledgementGenerator implements Processor {
     private static final Logger LOG = LoggerFactory.getLogger(Hl7AcknowledgementGenerator.class);
@@ -98,11 +98,11 @@ public class Hl7AcknowledgementGenerator implements Processor {
         }
 
         if (-1 == endOfMSH) {
-            throw new Hl7AcknowledgementGenerationException("Failed to find the end of the  MSH Segment while attempting to generate response", hl7MessageBytes);
+            throw new Hl7AcknowledgementGenerationException("Failed to find the end of the MSH Segment while attempting to generate response", hl7MessageBytes);
         }
 
         if (8 > fieldSeparatorIndexes.size()) {
-            throw new Hl7AcknowledgementGenerationException("Insufficient number of fields in after MSH-2 in MSH to generate a response - 8 are required but "
+            throw new Hl7AcknowledgementGenerationException("Insufficient number of fields in MSH to generate a response - 8 are required but "
                 + fieldSeparatorIndexes.size() + " " + "were found", hl7MessageBytes);
         }
 
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerAutoAcknowledgementWithBridgeErrorHandlerTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerAutoAcknowledgementWithBridgeErrorHandlerTest.java
new file mode 100644
index 0000000..6dafbd6
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerAutoAcknowledgementWithBridgeErrorHandlerTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.endsWith;
+import static org.hamcrest.CoreMatchers.startsWith;
+
+
+public class MllpTcpServerConsumerAutoAcknowledgementWithBridgeErrorHandlerTest extends TcpServerConsumerAcknowledgementTestSupport {
+    @Override
+    protected boolean isBridgeErrorHandler() {
+        return true;
+    }
+
+    @Override
+    protected boolean isAutoAck() {
+        return true;
+    }
+
+    @Test
+    public void testReceiveSingleMessage() throws Exception {
+        result.expectedBodiesReceived(TEST_MESSAGE);
+
+        complete.expectedBodiesReceived(TEST_MESSAGE);
+        complete.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+
+        receiveSingleMessage();
+
+        Exchange completeExchange = complete.getReceivedExchanges().get(0);
+
+        assertNotNull(completeExchange.getIn().getHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT));
+        assertNotNull(completeExchange.getIn().getHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING));
+
+        String acknowledgement = completeExchange.getIn().getHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, String.class);
+
+        assertThat(acknowledgement, startsWith("MSH|^~\\&|^org^sys||APP_A|FAC_A|"));
+        assertThat(acknowledgement, endsWith("||ACK^A04^ADT_A04|||2.6\rMSA|AA|\r"));
+    }
+
+    public void testAcknowledgementDeliveryFailure() throws Exception {
+        result.expectedBodiesReceived(TEST_MESSAGE);
+
+        failure.expectedBodiesReceived(TEST_MESSAGE);
+        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT, EXPECTED_ACKNOWLEDGEMENT);
+        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, EXPECTED_ACKNOWLEDGEMENT);
+
+        acknowledgementDeliveryFailure();
+
+        Exchange failureExchange = failure.getExchanges().get(0);
+        Object failureException = failureExchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION);
+        assertNotNull("OnFailureOnly exchange should have a " + MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION + " property", failureException);
+        assertIsInstanceOf(Exception.class, failureException);
+    }
+
+    @Test
+    public void testUnparsableMessage() throws Exception {
+        final String testMessage = "MSH" + TEST_MESSAGE;
+
+        result.expectedBodiesReceived(testMessage);
+        complete.expectedMessageCount(1);
+        ackGenerationEx.expectedMessageCount(1);
+
+        unparsableMessage(testMessage);
+
+        assertNull("Should not have the exception in the exchange property", result.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+        assertNull("Should not have the exception in the exchange property", complete.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+
+        assertNotNull("Should have the exception in the exchange property", ackGenerationEx.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+    }
+
+    @Test
+    public void testMessageWithEmptySegment() throws Exception {
+        final String testMessage = TEST_MESSAGE.replace("\rPID|", "\r\rPID|");
+
+        result.expectedBodiesReceived(testMessage);
+        complete.expectedMessageCount(1);
+
+        unparsableMessage(testMessage);
+
+        assertNull("Should not have the exception in the exchange property", result.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+        assertNull("Should not have the exception in the exchange property", complete.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+    }
+
+
+    @Test
+    public void testMessageWithEmbeddedNewlines() throws Exception {
+        final String testMessage = TEST_MESSAGE.replace("\rPID|", "\r\n\rPID|\n");
+
+        result.expectedBodiesReceived(testMessage);
+        complete.expectedMessageCount(1);
+
+        unparsableMessage(testMessage);
+
+        assertNull("Should not have the exception in the exchange property", result.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+        assertNull("Should not have the exception in the exchange property", complete.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+    }
+}
+
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerAutoAcknowledgementWithoutBridgeErrorHandlerTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerAutoAcknowledgementWithoutBridgeErrorHandlerTest.java
new file mode 100644
index 0000000..51a8237
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerAutoAcknowledgementWithoutBridgeErrorHandlerTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import org.apache.camel.Exchange;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.endsWith;
+import static org.hamcrest.CoreMatchers.startsWith;
+
+
+public class MllpTcpServerConsumerAutoAcknowledgementWithoutBridgeErrorHandlerTest extends TcpServerConsumerAcknowledgementTestSupport {
+    @Override
+    protected boolean isBridgeErrorHandler() {
+        return false;
+    }
+
+    @Override
+    protected boolean isAutoAck() {
+        return true;
+    }
+
+    @Test
+    public void testReceiveSingleMessage() throws Exception {
+        result.expectedBodiesReceived(TEST_MESSAGE);
+
+        complete.expectedBodiesReceived(TEST_MESSAGE);
+        complete.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+
+        receiveSingleMessage();
+
+        Exchange completeExchange = complete.getReceivedExchanges().get(0);
+
+        assertNotNull(completeExchange.getIn().getHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT));
+        assertNotNull(completeExchange.getIn().getHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING));
+
+        String acknowledgement = completeExchange.getIn().getHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, String.class);
+
+        assertThat(acknowledgement, startsWith("MSH|^~\\&|^org^sys||APP_A|FAC_A|"));
+        assertThat(acknowledgement, endsWith("||ACK^A04^ADT_A04|||2.6\rMSA|AA|\r"));
+    }
+
+    public void testAcknowledgementDeliveryFailure() throws Exception {
+        result.expectedBodiesReceived(TEST_MESSAGE);
+
+        failure.expectedBodiesReceived(TEST_MESSAGE);
+        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT, EXPECTED_ACKNOWLEDGEMENT);
+        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, EXPECTED_ACKNOWLEDGEMENT);
+
+        acknowledgementDeliveryFailure();
+
+        Exchange failureExchange = failure.getExchanges().get(0);
+        Object failureException = failureExchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION);
+        assertNotNull("OnFailureOnly exchange should have a " + MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION + " property", failureException);
+        assertIsInstanceOf(Exception.class, failureException);
+    }
+
+    @Test
+    public void testUnparsableMessage() throws Exception {
+        final String testMessage = "MSH" + TEST_MESSAGE;
+
+        result.expectedBodiesReceived(testMessage);
+        complete.expectedMessageCount(1);
+
+        unparsableMessage(testMessage);
+
+        assertNull("Should not have the exception in the exchange property", result.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+        assertNull("Should not have the exception in the exchange property", complete.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+    }
+
+    @Test
+    public void testMessageWithEmptySegment() throws Exception {
+        final String testMessage = TEST_MESSAGE.replace("\rPID|", "\r\rPID|");
+
+        result.expectedBodiesReceived(testMessage);
+        complete.expectedMessageCount(1);
+
+        unparsableMessage(testMessage);
+
+        assertNull("Should not have the exception in the exchange property", result.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+        assertNull("Should not have the exception in the exchange property", complete.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+    }
+
+
+    @Test
+    public void testMessageWithEmbeddedNewlines() throws Exception {
+        final String testMessage = TEST_MESSAGE.replace("\rPID|", "\r\n\rPID|\n");
+
+        result.expectedBodiesReceived(testMessage);
+        complete.expectedMessageCount(1);
+
+        unparsableMessage(testMessage);
+
+        assertNull("Should not have the exception in the exchange property", result.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+        assertNull("Should not have the exception in the exchange property", complete.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+    }
+}
+
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerManualAcknowledgementWithBridgeErrorHandlerTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerManualAcknowledgementWithBridgeErrorHandlerTest.java
new file mode 100644
index 0000000..bd30ba1
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerManualAcknowledgementWithBridgeErrorHandlerTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import org.apache.camel.Exchange;
+import org.junit.Test;
+
+
+public class MllpTcpServerConsumerManualAcknowledgementWithBridgeErrorHandlerTest extends TcpServerConsumerAcknowledgementTestSupport {
+    @Override
+    protected boolean isBridgeErrorHandler() {
+        return true;
+    }
+
+    @Override
+    protected boolean isAutoAck() {
+        return false;
+    }
+
+    @Test
+    public void testReceiveSingleMessage() throws Exception {
+        result.expectedBodiesReceived(TEST_MESSAGE);
+        complete.expectedBodiesReceived(TEST_MESSAGE);
+        invalidAckEx.expectedMessageCount(1);
+
+        receiveSingleMessage();
+
+        Exchange completeExchange = complete.getReceivedExchanges().get(0);
+
+        assertNull(completeExchange.getIn().getHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT));
+        assertNull(completeExchange.getIn().getHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING));
+    }
+
+    public void testAcknowledgementDeliveryFailure() throws Exception {
+        result.expectedBodiesReceived(TEST_MESSAGE);
+        failure.expectedBodiesReceived(TEST_MESSAGE);
+
+        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT, EXPECTED_ACKNOWLEDGEMENT);
+        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, EXPECTED_ACKNOWLEDGEMENT);
+
+        acknowledgementDeliveryFailure();
+
+        Exchange failureExchange = failure.getExchanges().get(0);
+        Object failureException = failureExchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION);
+        assertNotNull("OnFailureOnly exchange should have a " + MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION + " property", failureException);
+        assertIsInstanceOf(Exception.class, failureException);
+    }
+
+    @Test
+    public void testUnparsableMessage() throws Exception {
+        final String testMessage = "MSH" + TEST_MESSAGE;
+
+        result.expectedBodiesReceived(testMessage);
+        complete.expectedMessageCount(1);
+        invalidAckEx.expectedMessageCount(1);
+
+        unparsableMessage(testMessage);
+
+        assertNull("Should not have the exception in the exchange property", result.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+        assertNull("Should not have the exception in the exchange property", complete.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+    }
+
+    @Test
+    public void testMessageWithEmptySegment() throws Exception {
+        final String testMessage = TEST_MESSAGE.replace("\rPID|", "\r\rPID|");
+
+        result.expectedBodiesReceived(testMessage);
+        complete.expectedMessageCount(1);
+        invalidAckEx.expectedMessageCount(1);
+
+        unparsableMessage(testMessage);
+
+        assertNull("Should not have the exception in the exchange property", result.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+        assertNull("Should not have the exception in the exchange property", complete.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+    }
+
+
+    @Test
+    public void testMessageWithEmbeddedNewlines() throws Exception {
+        final String testMessage = TEST_MESSAGE.replace("\rPID|", "\r\n\rPID|\n");
+
+        result.expectedBodiesReceived(testMessage);
+        complete.expectedMessageCount(1);
+        invalidAckEx.expectedMessageCount(1);
+
+        unparsableMessage(testMessage);
+
+        assertNull("Should not have the exception in the exchange property", result.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+        assertNull("Should not have the exception in the exchange property", complete.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+    }
+}
+
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerManualAcknowledgementWithoutBridgeErrorHandlerTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerManualAcknowledgementWithoutBridgeErrorHandlerTest.java
new file mode 100644
index 0000000..440a052
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerManualAcknowledgementWithoutBridgeErrorHandlerTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import org.apache.camel.Exchange;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.endsWith;
+import static org.hamcrest.CoreMatchers.startsWith;
+
+
+public class MllpTcpServerConsumerManualAcknowledgementWithoutBridgeErrorHandlerTest extends TcpServerConsumerAcknowledgementTestSupport {
+    @Override
+    protected boolean isBridgeErrorHandler() {
+        return false;
+    }
+
+    @Override
+    protected boolean isAutoAck() {
+        return false;
+    }
+
+    @Test
+    public void testReceiveSingleMessage() throws Exception {
+        result.expectedBodiesReceived(TEST_MESSAGE);
+
+        complete.expectedBodiesReceived(TEST_MESSAGE);
+
+        receiveSingleMessage();
+
+        Exchange completeExchange = complete.getReceivedExchanges().get(0);
+
+        assertNull(completeExchange.getIn().getHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT));
+        assertNull(completeExchange.getIn().getHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING));
+    }
+
+    public void testAcknowledgementDeliveryFailure() throws Exception {
+        result.expectedBodiesReceived(TEST_MESSAGE);
+
+        failure.expectedBodiesReceived(TEST_MESSAGE);
+        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT, EXPECTED_ACKNOWLEDGEMENT);
+        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, EXPECTED_ACKNOWLEDGEMENT);
+
+        acknowledgementDeliveryFailure();
+
+        Exchange failureExchange = failure.getExchanges().get(0);
+        Object failureException = failureExchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION);
+        assertNotNull("OnFailureOnly exchange should have a " + MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION + " property", failureException);
+        assertIsInstanceOf(Exception.class, failureException);
+    }
+
+    @Test
+    public void testUnparsableMessage() throws Exception {
+        final String testMessage = "MSH" + TEST_MESSAGE;
+
+        result.expectedBodiesReceived(testMessage);
+        complete.expectedMessageCount(1);
+
+        unparsableMessage(testMessage);
+
+        assertNull("Should not have the exception in the exchange property", result.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+        assertNull("Should not have the exception in the exchange property", complete.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+    }
+
+    @Test
+    public void testMessageWithEmptySegment() throws Exception {
+        final String testMessage = TEST_MESSAGE.replace("\rPID|", "\r\rPID|");
+
+        result.expectedBodiesReceived(testMessage);
+        complete.expectedMessageCount(1);
+
+        unparsableMessage(testMessage);
+
+        assertNull("Should not have the exception in the exchange property", result.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+        assertNull("Should not have the exception in the exchange property", complete.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+    }
+
+
+    @Test
+    public void testMessageWithEmbeddedNewlines() throws Exception {
+        final String testMessage = TEST_MESSAGE.replace("\rPID|", "\r\n\rPID|\n");
+
+        result.expectedBodiesReceived(testMessage);
+        complete.expectedMessageCount(1);
+
+        unparsableMessage(testMessage);
+
+        assertNull("Should not have the exception in the exchange property", result.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+        assertNull("Should not have the exception in the exchange property", complete.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+    }
+}
+
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java
index cc1c7d4..492574e 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java
@@ -47,8 +47,6 @@ public class MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest extends
 
     @Override
     public void testNthInvalidMessage() throws Exception {
-        expectedFailedCount = 1;
-
         runNthInvalidMessage();
     }
 
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java
index 4abe439..2e8c9c7 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java
@@ -37,15 +37,11 @@ public class MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest extends
 
     @Override
     public void testInvalidMessage() throws Exception {
-        expectedFailedCount = 1;
-
         runNthInvalidMessage();
     }
 
     @Override
     public void testNthInvalidMessage() throws Exception {
-        expectedFailedCount = 1;
-
         runNthInvalidMessage();
     }
 
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerAcknowledgementTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerAcknowledgementTestSupport.java
similarity index 54%
rename from components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerAcknowledgementTest.java
rename to components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerAcknowledgementTestSupport.java
index 9f7173e..5cec934 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerAcknowledgementTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerAcknowledgementTestSupport.java
@@ -21,18 +21,27 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.EndpointInject;
-import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit.rule.mllp.MllpClientResource;
 import org.apache.camel.test.junit4.CamelTestSupport;
+
 import org.junit.Rule;
-import org.junit.Test;
 
-public class MllpTcpServerConsumerAcknowledgementTest extends CamelTestSupport {
+
+public abstract class TcpServerConsumerAcknowledgementTestSupport extends CamelTestSupport {
+    static final String TEST_MESSAGE =
+        "MSH|^~\\&|APP_A|FAC_A|^org^sys||||ADT^A04^ADT_A04|||2.6" + '\r'
+        + "PID|1||1100832^^^^PI||TEST^FIG||98765432|U||R|435 MAIN STREET^^LONGMONT^CO^80503||123-456-7890|||S" + '\r';
+
+    static final String EXPECTED_ACKNOWLEDGEMENT =
+        "MSH|^~\\&|^org^sys||APP_A|FAC_A|||ACK^A04^ADT_A04|||2.6" + '\r'
+        + "MSA|AA|" + '\r';
+
     @Rule
     public MllpClientResource mllpClient = new MllpClientResource();
 
@@ -45,6 +54,24 @@ public class MllpTcpServerConsumerAcknowledgementTest extends CamelTestSupport {
     @EndpointInject(uri = "mock://on-failure-only")
     MockEndpoint failure;
 
+    @EndpointInject(uri = "mock://invalid-ack-ex")
+    MockEndpoint invalidAckEx;
+
+
+    @EndpointInject(uri = "mock://ack-generation-ex")
+    MockEndpoint ackGenerationEx;
+
+    @Override
+    protected void doPostSetup() throws Exception {
+        super.doPostSetup();
+
+        result.expectedMessageCount(0);
+        complete.expectedMessageCount(0);
+        failure.expectedMessageCount(0);
+        invalidAckEx.expectedMessageCount(0);
+        ackGenerationEx.expectedMessageCount(0);
+    }
+
     @Override
     protected CamelContext createCamelContext() throws Exception {
         DefaultCamelContext context = (DefaultCamelContext) super.createCamelContext();
@@ -68,81 +95,65 @@ public class MllpTcpServerConsumerAcknowledgementTest extends CamelTestSupport {
             public void configure() throws Exception {
                 String routeId = "mllp-test-receiver-route";
 
+                onException(MllpInvalidAcknowledgementException.class)
+                    .handled(false)
+                    .to("mock://invalid-ack-ex");
+
+                onException(MllpAcknowledgementGenerationException.class)
+                    .handled(false)
+                    .to("mock://ack-generation-ex");
+
                 onCompletion()
                     .onCompleteOnly()
                     .log(LoggingLevel.INFO, routeId, "Test route complete")
                     .to("mock://on-complete-only");
+
                 onCompletion()
                     .onFailureOnly()
                     .log(LoggingLevel.INFO, routeId, "Test route complete")
                     .to("mock://on-failure-only");
-                fromF("mllp://%s:%d?autoAck=true&connectTimeout=%d&receiveTimeout=%d",
-                    mllpClient.getMllpHost(), mllpClient.getMllpPort(), connectTimeout, responseTimeout)
+
+                fromF("mllp://%s:%d?bridgeErrorHandler=%b&autoAck=%b&connectTimeout=%d&receiveTimeout=%d",
+                    mllpClient.getMllpHost(), mllpClient.getMllpPort(), isBridgeErrorHandler(), isAutoAck(), connectTimeout, responseTimeout)
                     .routeId(routeId)
                     .to(result);
             }
         };
     }
 
-    @Test
-    public void testReceiveSingleMessage() throws Exception {
-        final String testMessage =
-            "MSH|^~\\&|APP_A|FAC_A|^org^sys||||ADT^A04^ADT_A04|||2.6" + '\r'
-                + "PID|1||1100832^^^^PI||TEST^FIG||98765432|U||R|435 MAIN STREET^^LONGMONT^CO^80503||123-456-7890|||S" + '\r';
-
-        final String expectedAcknowledgement =
-            "MSH|^~\\&|^org^sys||APP_A|FAC_A|||ACK^A04^ADT_A04|||2.6" + '\r'
-                + "MSA|AA|" + '\r';
-
-        result.expectedBodiesReceived(testMessage);
-
-        complete.expectedBodiesReceived(testMessage);
-        complete.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
-        complete.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT, expectedAcknowledgement);
-        complete.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, expectedAcknowledgement);
+    protected abstract boolean isBridgeErrorHandler();
+    protected abstract boolean isAutoAck();
 
-        failure.expectedMessageCount(0);
+    public void receiveSingleMessage() throws Exception {
+        NotifyBuilder done = new NotifyBuilder(context).whenDone(1).create();
 
         mllpClient.connect();
 
-        String acknowledgement = mllpClient.sendMessageAndWaitForAcknowledgement(testMessage, 10000);
+        mllpClient.sendFramedData(TEST_MESSAGE);
 
-        assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+        assertTrue("Exchange should have completed", done.matches(10, TimeUnit.SECONDS));
 
-        assertEquals("Unexpected Acknowledgement", expectedAcknowledgement, acknowledgement);
+        assertMockEndpointsSatisfied();
     }
 
-    @Test
-    public void testAcknowledgementDeliveryFailure() throws Exception {
-        final String testMessage =
-            "MSH|^~\\&|APP_A|FAC_A|^org^sys||||ADT^A04^ADT_A04|||2.6" + '\r'
-                + "PID|1||1100832^^^^PI||TEST^FIG||98765432|U||R|435 MAIN STREET^^LONGMONT^CO^80503||123-456-7890|||S" + '\r';
-
-        final String expectedAcknowledgement =
-            "MSH|^~\\&|^org^sys||APP_A|FAC_A|||ACK^A04^ADT_A04|||2.6" + '\r'
-                + "MSA|AA|" + '\r';
-
-        result.expectedBodiesReceived(testMessage);
-
-        complete.expectedMessageCount(0);
-
-        failure.expectedBodiesReceived(testMessage);
-        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
-        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT, expectedAcknowledgement);
-        failure.expectedHeaderReceived(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, expectedAcknowledgement);
-
+    public void acknowledgementDeliveryFailure() throws Exception {
         boolean disconnectAfterSend = true;
         mllpClient.setDisconnectMethod(MllpClientResource.DisconnectMethod.RESET);
         mllpClient.connect();
 
-        mllpClient.sendFramedData(testMessage, disconnectAfterSend);
+        mllpClient.sendFramedData(TEST_MESSAGE, disconnectAfterSend);
 
         assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+    }
+
+    public void unparsableMessage(String testMessage) throws Exception {
+        NotifyBuilder done = new NotifyBuilder(context).whenDone(1).create();
+
+        mllpClient.connect();
+        mllpClient.sendFramedData(testMessage);
 
-        Exchange failureExchange = failure.getExchanges().get(0);
-        Object failureException = failureExchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION);
-        assertNotNull("OnFailureOnly exchange should have a " + MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION + " property", failureException);
-        assertIsInstanceOf(Exception.class, failureException);
+        assertTrue("One exchange should have complete", done.matches(5, TimeUnit.SECONDS));
+        assertMockEndpointsSatisfied();
     }
 }
 
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/Hl7UtilTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/Hl7UtilTest.java
index 9ba18b4..fca490f 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/Hl7UtilTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/Hl7UtilTest.java
@@ -19,14 +19,17 @@ package org.apache.camel.component.mllp.internal;
 
 import java.io.ByteArrayOutputStream;
 
+import org.apache.camel.component.mllp.MllpAcknowledgementGenerationException;
 import org.apache.camel.component.mllp.MllpProtocolConstants;
 
-import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerationException;
 import org.apache.camel.test.stub.camel.MllpEndpointStub;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.endsWith;
+import static org.hamcrest.CoreMatchers.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 public class Hl7UtilTest {
@@ -48,11 +51,17 @@ public class Hl7UtilTest {
             + "565-33-2222|||||^^^^^USA|||UNKNOWN|||||||||||||||||||||||||||||" + '\r'
             + "UB2||||||||" + '\r';
 
-    static final String EXPTECTED_ACKNOWLEDGEMENT_PAYLOAD =
+    static final String EXPECTED_ACKNOWLEDGEMENT_PAYLOAD_START = MllpProtocolConstants.START_OF_BLOCK + "MSH|^~\\&|JCAPS|CC|ADT|EPIC|";
+
+    static final String EXPECTED_ACKNOWLEDGEMENT_PAYLOAD_END = "||ACK^A08|00001A|D|2.3^^|||||||\r"
+        + "MSA|AA|00001\r"
+        + MllpProtocolConstants.END_OF_BLOCK + MllpProtocolConstants.END_OF_DATA;
+
+    static final String EXPECTED_ACKNOWLEDGEMENT_PAYLOAD =
         MllpProtocolConstants.START_OF_BLOCK
-            + "MSH|^~\\&|JCAPS|CC|ADT|EPIC|20161206193919|RISTECH|ACK^A08|00001|D|2.3^^|||||||\r"
-            + "MSA|AA|00001\r"
-            + MllpProtocolConstants.END_OF_BLOCK + MllpProtocolConstants.END_OF_DATA;
+        + "MSH|^~\\&|JCAPS|CC|ADT|EPIC|20161206193919||ACK^A08|00001A|D|2.3^^|||||||\r"
+        + "MSA|AA|00001\r"
+        + MllpProtocolConstants.END_OF_BLOCK + MllpProtocolConstants.END_OF_DATA;
 
     static final String EXPECTED_MESSAGE =
         "MSH|^~\\&|ADT|EPIC|JCAPS|CC|20161206193919|RISTECH|ADT^A08|00001|D|2.3^^|||||||" + "<0x0D CR>"
@@ -179,7 +188,10 @@ public class Hl7UtilTest {
         MllpSocketBuffer mllpSocketBuffer = new MllpSocketBuffer(new MllpEndpointStub());
         Hl7Util.generateAcknowledgementPayload(mllpSocketBuffer, TEST_MESSAGE.getBytes(), "AA");
 
-        assertEquals(EXPTECTED_ACKNOWLEDGEMENT_PAYLOAD, mllpSocketBuffer.toString());
+        String actual = mllpSocketBuffer.toString();
+
+        assertThat(actual, startsWith(EXPECTED_ACKNOWLEDGEMENT_PAYLOAD_START));
+        assertThat(actual, endsWith(EXPECTED_ACKNOWLEDGEMENT_PAYLOAD_END));
     }
 
     /**
@@ -187,12 +199,12 @@ public class Hl7UtilTest {
      *
      * @throws Exception in the event of a test error.
      */
-    @Test(expected = Hl7AcknowledgementGenerationException.class)
+    @Test(expected = MllpAcknowledgementGenerationException.class)
     public void testGenerateAcknowledgementPayloadFromNullMessage() throws Exception {
         MllpSocketBuffer mllpSocketBuffer = new MllpSocketBuffer(new MllpEndpointStub());
         Hl7Util.generateAcknowledgementPayload(mllpSocketBuffer, null, "AA");
 
-        assertEquals(EXPTECTED_ACKNOWLEDGEMENT_PAYLOAD, mllpSocketBuffer.toString());
+        assertEquals(EXPECTED_ACKNOWLEDGEMENT_PAYLOAD, mllpSocketBuffer.toString());
     }
 
     /**
@@ -200,12 +212,12 @@ public class Hl7UtilTest {
      *
      * @throws Exception in the event of a test error.
      */
-    @Test(expected = Hl7AcknowledgementGenerationException.class)
+    @Test(expected = MllpAcknowledgementGenerationException.class)
     public void testGenerateAcknowledgementPayloadFromEmptyMessage() throws Exception {
         MllpSocketBuffer mllpSocketBuffer = new MllpSocketBuffer(new MllpEndpointStub());
         Hl7Util.generateAcknowledgementPayload(mllpSocketBuffer, new byte[0], "AA");
 
-        assertEquals(EXPTECTED_ACKNOWLEDGEMENT_PAYLOAD, mllpSocketBuffer.toString());
+        assertEquals(EXPECTED_ACKNOWLEDGEMENT_PAYLOAD, mllpSocketBuffer.toString());
     }
 
     /**
@@ -213,12 +225,12 @@ public class Hl7UtilTest {
      *
      * @throws Exception in the event of a test error.
      */
-    @Test
+    @Test(expected = MllpAcknowledgementGenerationException.class)
     public void testGenerateAcknowledgementPayloadWithoutEnoughFields() throws Exception {
-        MllpSocketBuffer mllpSocketBuffer = new MllpSocketBuffer(new MllpEndpointStub());
-        Hl7Util.generateAcknowledgementPayload(mllpSocketBuffer, TEST_MESSAGE.replaceFirst("|RISTECH|ADT^A08|00001|D|2.3^^|||||||", "").getBytes(), "AA");
+        final byte[] testMessage = TEST_MESSAGE.replace("|RISTECH|ADT^A08|00001|D|2.3^^|||||||", "").getBytes();
 
-        assertEquals(EXPTECTED_ACKNOWLEDGEMENT_PAYLOAD, mllpSocketBuffer.toString());
+        MllpSocketBuffer mllpSocketBuffer = new MllpSocketBuffer(new MllpEndpointStub());
+        Hl7Util.generateAcknowledgementPayload(mllpSocketBuffer, testMessage, "AA");
     }
 
     /**
@@ -231,17 +243,15 @@ public class Hl7UtilTest {
         String junkMessage = "MSH|^~\\&|ADT|EPIC|JCAPS|CC|20161206193919|RISTECH|ADT^A08|00001|D|2.3^^|||||||"
             + "EVN|A08|20150107161440||REG_UPDATE_SEND_VISIT_MESSAGES_ON_PATIENT_CHANGES|RISTECH^RADIOLOGY^TECHNOLOGIST^^^^^^UCLA^^^^^RRMC||";
 
-        String junkAcknowledgement =
-            MllpProtocolConstants.START_OF_BLOCK
-                + "MSH|^~\\&|JCAPS|CC|ADT|EPIC|20161206193919|RISTECH|ACK^A08|00001|D|2.3^^|||||||"
-                + "EVN|A08|20150107161440||REG_UPDATE_SEND_VISIT_MESSAGES_ON_PATIENT_CHANGES|RISTECH^RADIOLOGY^TECHNOLOGIST^^^^^^UCLA^^^^^RRMC|\r"
-                + "MSA|AA|00001\r"
-                + MllpProtocolConstants.END_OF_BLOCK + MllpProtocolConstants.END_OF_DATA;
-
         MllpSocketBuffer mllpSocketBuffer = new MllpSocketBuffer(new MllpEndpointStub());
         Hl7Util.generateAcknowledgementPayload(mllpSocketBuffer, junkMessage.getBytes(), "AA");
 
-        assertEquals(junkAcknowledgement, mllpSocketBuffer.toString());
+        String actual = mllpSocketBuffer.toString();
+
+        assertThat(actual, startsWith(EXPECTED_ACKNOWLEDGEMENT_PAYLOAD_START));
+        assertThat(actual, endsWith("|EVN|A08|20150107161440||REG_UPDATE_SEND_VISIT_MESSAGES_ON_PATIENT_CHANGES|RISTECH^RADIOLOGY^TECHNOLOGIST^^^^^^UCLA^^^^^RRMC|\r"
+                                    + "MSA|AA|00001\r"
+                                    + MllpProtocolConstants.END_OF_BLOCK + MllpProtocolConstants.END_OF_DATA));
     }
 
     /**

-- 
To stop receiving notification emails like this one, please contact
quinn@apache.org.