You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/10/11 09:04:19 UTC

git commit: CAMEL-7888 Fix the issue that HL7Decoder leaks memory

Repository: camel
Updated Branches:
  refs/heads/master c3715c4ce -> 402e5a1d0


CAMEL-7888 Fix the issue that HL7Decoder leaks memory


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/402e5a1d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/402e5a1d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/402e5a1d

Branch: refs/heads/master
Commit: 402e5a1d07fccefacea85cb6a19ab669085202b1
Parents: c3715c4
Author: Willem Jiang <wi...@gmail.com>
Authored: Sat Oct 11 15:04:05 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Sat Oct 11 15:04:05 2014 +0800

----------------------------------------------------------------------
 .../camel/component/hl7/HL7Converter.java       |  10 +
 .../camel/component/hl7/HL7MLLPDecoder.java     | 217 ++++++++++---------
 .../camel/component/hl7/HL7MLLPEncoder.java     |  34 +--
 .../component/hl7/HL7MLLPCodecLongTest.java     |   4 +-
 .../hl7/HL7MLLPCodecPlainStringTest.java        |   2 +-
 .../hl7/HL7MLLPCodecStandAndEndBytesTest.java   |   2 +-
 .../camel/component/hl7/HL7MLLPCodecTest.java   |   2 +-
 .../camel/component/hl7/HL7RouteTest.java       |  17 +-
 8 files changed, 150 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/402e5a1d/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7Converter.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7Converter.java b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7Converter.java
index 8898647..e8c98fb 100644
--- a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7Converter.java
+++ b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7Converter.java
@@ -42,10 +42,20 @@ public final class HL7Converter {
     }
 
     @Converter
+    public static byte[] toByteArray(Message message) throws HL7Exception {
+        return message.encode().getBytes();
+    }
+
+    @Converter
     public static Message toMessage(String body) throws HL7Exception {
         return parse(body, DEFAULT_CONTEXT.getGenericParser());
     }
 
+    @Converter
+    public static Message toMessage(byte[] body) throws HL7Exception {
+        return parse(new String(body), DEFAULT_CONTEXT.getGenericParser());
+    }
+
     static Message parse(String body, Parser parser) throws HL7Exception {
         return parser.parse(body);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/402e5a1d/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java
index a2b8e2f..3c616e1 100644
--- a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java
+++ b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java
@@ -27,15 +27,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * HL7MLLPDecoder that is aware that a HL7 message can span several TCP packets.
+ * HL7MLLPDecoder that is aware that a HL7 message can span several buffers.
  * In addition, it avoids rescanning packets by keeping state in the IOSession.
  */
 class HL7MLLPDecoder extends CumulativeProtocolDecoder {
 
     private static final Logger LOG = LoggerFactory.getLogger(HL7MLLPDecoder.class);
-
-    private static final String CHARSET_DECODER = HL7MLLPDecoder.class.getName() + ".charsetdecoder";
     private static final String DECODER_STATE = HL7MLLPDecoder.class.getName() + ".STATE";
+    private static final String CHARSET_DECODER = HL7MLLPDecoder.class.getName() + ".charsetdecoder";
 
     private HL7MLLPConfig config;
 
@@ -43,132 +42,148 @@ class HL7MLLPDecoder extends CumulativeProtocolDecoder {
         this.config = config;
     }
 
+
     @Override
     protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) {
 
-        // Scan the buffer of start and/or end bytes
-        boolean foundEnd = scan(session, in);
+        // Get the state of the current message and
+        // Skip what we have already scanned
+        DecoderState state = decoderState(session);
+        in.position(state.current());
+
+        while (in.hasRemaining()) {
+            byte current = in.get();
 
-        // Write HL7 string or wait until message end arrives or buffer ends
-        if (foundEnd) {
-            writeString(session, in, out);
-        } else {
-            LOG.debug("No complete message in this packet");
+            // If it is the start byte and mark the position
+            if (current == config.getStartByte()) {
+                state.markStart(in.position() - 1);
+            }
+            // If it is the end bytes, extract the payload and return
+            if (state.previous() == config.getEndByte1() && current == config.getEndByte2()) {
+
+                // Remember the current position and limit.
+                int position = in.position();
+                int limit = in.limit();
+                LOG.debug("Message ends at position {} with length {}",
+                        position, position - state.start());
+                try {
+                    in.position(state.start());
+                    in.limit(position);
+                    // The bytes between in.position() and in.limit()
+                    // now contain a full MLLP message including the
+                    // start and end bytes.
+                    out.write(parseMessage(in.slice(), charsetDecoder(session)));
+                } catch (CharacterCodingException cce) {
+                    throw new IllegalArgumentException("Exception while finalizing the message", cce);
+                } finally {
+                    // Reset position, limit, and state
+                    in.limit(limit);
+                    in.position(position);
+                    state.reset();
+                }
+                return true;
+            }
+            // Remember previous byte in state object because the buffer could
+            // be theoretically exhausted right between the two end bytes
+            state.markPrevious(current);
         }
 
-        return foundEnd;
+        // Could not find a complete message in the buffer.
+        // Reset to the initial position and return false so that this method
+        // is called again with more data.
+        LOG.debug("No complete message yet at position {} ", in.position());
+        state.markCurrent(in.position());
+        in.position(0);
+        return false;
     }
 
-    private void writeString(IoSession session, IoBuffer in, ProtocolDecoderOutput out) {
-        DecoderState state = decoderState(session);
-        if (state.posStart == 0) {
-            LOG.warn("No start byte found, reading from beginning of data");
-        }
-        // start reading from the buffer after the start markers
-        in.position(state.posStart);
-        try {
-            String body = in.getString(state.length(), charsetDecoder(session));
-            LOG.debug("Decoded HL7 from byte stream of length {} to String of length {}", state.length(), body.length());
-            if (config.isConvertLFtoCR()) {
-                body = body.replace('\n', '\r');
-            }
-            out.write(body);
-            // Avoid redelivery of scanned message
-            state.reset();
-        } catch (CharacterCodingException e) {
-            throw new RuntimeException(e);
+    // Make a defensive byte copy (the buffer will be reused)
+    // and omit the start and the two end bytes of the MLLP message
+    // TODO: I wonder if it would make sense to return the plain byte array and let some subsequent
+    // processor do the conversion
+    private Object parseMessage(IoBuffer slice, CharsetDecoder decoder) throws CharacterCodingException {
+        slice.skip(1); // skip start byte
+        String message = slice.getString(slice.limit() - 3, decoder);
+
+        // Only do this if conversion is enabled
+        if (config.isConvertLFtoCR()) {
+            message = message.replace('\n', '\r');
         }
+        return message;
     }
 
-    private CharsetDecoder charsetDecoder(IoSession session) {
-        // convert to string using the charset decoder
-        CharsetDecoder decoder = (CharsetDecoder) session.getAttribute(CHARSET_DECODER);
-        if (decoder == null) {
-            decoder = config.getCharset().newDecoder();
-            session.setAttribute(CHARSET_DECODER, decoder);
-        }
-        return decoder;
+    @Override
+    public void dispose(IoSession session) throws Exception {
+        session.removeAttribute(DECODER_STATE);
+        session.removeAttribute(CHARSET_DECODER);
     }
 
-    /**
-     * Scans the buffer for start and end bytes and stores its position in the
-     * session state object.
-     *
-     * @return <code>true</code> if the end bytes were found, <code>false</code>
-     *         otherwise
-     */
-    private boolean scan(IoSession session, IoBuffer in) {
-        DecoderState state = decoderState(session);
-        // Start scanning where we left
-        in.position(state.current);
-        LOG.debug("Start scanning buffer at position {}", in.position());
-        while (in.hasRemaining()) {
-            byte b = in.get();
-            // Check start byte
-            if (b == config.getStartByte()) {
-                if (state.posStart > 0 || state.waitingForEndByte2) {
-                    LOG.warn("Ignoring message start at position {} before previous message has ended.", in.position());
-                } else {
-                    state.posStart = in.position();
-                    state.waitingForEndByte2 = false;
-                    LOG.debug("Message starts at position {}", state.posStart);
-                }
-            }
-            // Check end byte1 
-            if (b == config.getEndByte1()) {
-                if (!state.waitingForEndByte2 && state.posStart > 0) {
-                    state.waitingForEndByte2 = true;
-                } else {
-                    LOG.warn("Ignoring unexpected 1st end byte {}. Expected 2nd endpoint {}", b, config.getEndByte2());
-                }
-            }
-            // Check end byte2 
-            if (b == config.getEndByte2() && state.waitingForEndByte2) {
-                state.posEnd = in.position() - 2; // use -2 to skip these
-                // last 2 end markers
-                state.waitingForEndByte2 = false;
-                LOG.debug("Message ends at position {}", state.posEnd);
-                break;
+    private CharsetDecoder charsetDecoder(IoSession session) {
+        synchronized (session) {
+            CharsetDecoder decoder = (CharsetDecoder) session.getAttribute(CHARSET_DECODER);
+            if (decoder == null) {
+                decoder = config.getCharset().newDecoder();
+                session.setAttribute(CHARSET_DECODER, decoder);
             }
+            return decoder;
         }
-        // Remember where we are
-        state.current = in.position();
-        in.rewind();
-        return state.posEnd > 0;
     }
 
     private DecoderState decoderState(IoSession session) {
-        DecoderState decoderState = (DecoderState) session.getAttribute(DECODER_STATE);
-        if (decoderState == null) {
-            decoderState = new DecoderState();
-            session.setAttribute(DECODER_STATE, decoderState);
+        synchronized (session) {
+            DecoderState decoderState = (DecoderState) session.getAttribute(DECODER_STATE);
+            if (decoderState == null) {
+                decoderState = new DecoderState();
+                session.setAttribute(DECODER_STATE, decoderState);
+            }
+            return decoderState;
         }
-        return decoderState;
-    }
-
-    @Override
-    public void dispose(IoSession session) throws Exception {
-        session.removeAttribute(CHARSET_DECODER);
-        session.removeAttribute(DECODER_STATE);
     }
 
     /**
      * Holds the state of the decoding process
      */
     private static class DecoderState {
-        int posStart;
-        int posEnd;
-        int current;
-        boolean waitingForEndByte2;
+        private int startPos;
+        private int currentPos;
+        private byte previousByte;
+        private boolean started;
+
+        void reset() {
+            startPos = 0;
+            currentPos = 0;
+            started = false;
+            previousByte = 0;
+        }
+
+        void markStart(int position) {
+            if (started) {
+                LOG.warn("Ignoring message start at position {} before previous message has ended.", position);
+            } else {
+                startPos = position;
+                LOG.debug("Message starts at position {}", startPos);
+                started = true;
+            }
+        }
 
-        int length() {
-            return posEnd - posStart;
+        void markCurrent(int position) {
+            currentPos = position;
         }
 
-        void reset() {
-            posStart = 0;
-            posEnd = 0;
-            waitingForEndByte2 = false;
+        void markPrevious(byte previous) {
+            previousByte = previous;
+        }
+
+        public int start() {
+            return startPos;
+        }
+
+        public int current() {
+            return currentPos;
+        }
+
+        public byte previous() {
+            return previousByte;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/402e5a1d/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPEncoder.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPEncoder.java b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPEncoder.java
index 4ac5c42..01291d4 100644
--- a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPEncoder.java
+++ b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPEncoder.java
@@ -16,9 +16,7 @@
  */
 package org.apache.camel.component.hl7;
 
-import java.nio.charset.CharsetEncoder;
 import ca.uhn.hl7v2.model.Message;
-
 import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.filter.codec.ProtocolEncoder;
@@ -33,60 +31,46 @@ class HL7MLLPEncoder implements ProtocolEncoder {
 
     private static final Logger LOG = LoggerFactory.getLogger(HL7MLLPEncoder.class);
 
-    private static final String CHARSET_ENCODER = HL7MLLPCodec.class.getName() + ".charsetencoder";
-
     private HL7MLLPConfig config;
 
     HL7MLLPEncoder(HL7MLLPConfig config) {
         this.config = config;
     }
 
+    @Override
     public void dispose(IoSession session) throws Exception {
-        session.removeAttribute(CHARSET_ENCODER);
     }
 
     public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
         if (message == null) {
-            throw new IllegalArgumentException("Message to encode is null");
+            throw new IllegalArgumentException("Message to be encoded is null");
         } else if (message instanceof Exception) {
             // we cannot handle exceptions
             throw (Exception)message;
         }
 
-        CharsetEncoder encoder = (CharsetEncoder)session.getAttribute(CHARSET_ENCODER);
-        if (encoder == null) {
-            encoder = config.getCharset().newEncoder();
-            session.setAttribute(CHARSET_ENCODER, encoder);
-        }
-
-        // convert to string
-        String body;
+        byte[] body;
         if (message instanceof Message) {
-            body = HL7Converter.encode((Message)message, config.getParser());
+            body = ((Message) message).encode().getBytes(config.getCharset());
         } else if (message instanceof String) {
-            body = (String)message;
+            body = ((String) message).getBytes(config.getCharset());
         } else if (message instanceof byte[]) {
-            body = new String((byte[])message);
+            body = (byte[])message;
         } else {
             throw new IllegalArgumentException("The message to encode is not a supported type: "
                                                + message.getClass().getCanonicalName());
         }
 
-        // replace \n with \r as HL7 uses 0x0d = \r as segment terminators
-        if (config.isConvertLFtoCR()) {
-            body = body.replace('\n', '\r');
-        }
-
         // put the data into the byte buffer
-        IoBuffer buf = IoBuffer.allocate(body.length() + 3).setAutoExpand(true);
+        IoBuffer buf = IoBuffer.allocate(body.length + 3).setAutoExpand(true);
         buf.put((byte)config.getStartByte());
-        buf.putString(body, encoder);
+        buf.put(body);
         buf.put((byte)config.getEndByte1());
         buf.put((byte)config.getEndByte2());
 
         // flip the buffer so we can use it to write to the out stream
         buf.flip();
-        LOG.debug("Encoding HL7 from {} to byte stream", message.getClass().getCanonicalName());
+        LOG.debug("Encoded HL7 from {} to byte stream", message.getClass().getCanonicalName());
         out.write(buf);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/402e5a1d/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecLongTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecLongTest.java b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecLongTest.java
index 96b59e7..febdbaf 100644
--- a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecLongTest.java
+++ b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecLongTest.java
@@ -55,7 +55,7 @@ public class HL7MLLPCodecLongTest extends HL7TestSupport {
             public void configure() throws Exception {
                 from("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
-                        assertEquals(70010, exchange.getIn().getBody().toString().length());
+                        assertEquals(70010, exchange.getIn().getBody(byte[].class).length);
                         MDM_T02 input = (MDM_T02)exchange.getIn().getBody(Message.class);
                         assertEquals("2.5", input.getVersion());
                         MSH msh = input.getMSH();
@@ -80,7 +80,7 @@ public class HL7MLLPCodecLongTest extends HL7TestSupport {
         }
         message = message.substring(0, message.length() - 1);
         assertEquals(70010, message.length());
-        String out = (String)template.requestBody("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec", message);
+        String out = template.requestBody("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec", message, String.class);
         assertEquals("some response", out);
         // END SNIPPET: e2
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/402e5a1d/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecPlainStringTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecPlainStringTest.java b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecPlainStringTest.java
index 7b182b3..e22a915 100644
--- a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecPlainStringTest.java
+++ b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecPlainStringTest.java
@@ -47,7 +47,7 @@ public class HL7MLLPCodecPlainStringTest extends HL7TestSupport {
         mock.expectedBodiesReceived("Bye World");
 
         // send plain hello world as String
-        Object out = template.requestBody("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec", "Hello World");
+        Object out = template.requestBody("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec", "Hello World", String.class);
 
         assertMockEndpointsSatisfied();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/402e5a1d/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecStandAndEndBytesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecStandAndEndBytesTest.java b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecStandAndEndBytesTest.java
index f7c03ac..96928d7 100644
--- a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecStandAndEndBytesTest.java
+++ b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecStandAndEndBytesTest.java
@@ -80,7 +80,7 @@ public class HL7MLLPCodecStandAndEndBytesTest extends HL7TestSupport {
         in.append("\r");
         in.append(line2);
 
-        String out = (String)template.requestBody("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec", in.toString());
+        String out = template.requestBody("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec", in.toString(), String.class);
 
         String[] lines = out.split("\r");
         assertEquals("MSH|^~\\&|MYSENDER||||200701011539||ADR^A19||||123", lines[0]);

http://git-wip-us.apache.org/repos/asf/camel/blob/402e5a1d/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecTest.java b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecTest.java
index 743334b..1d9afef 100644
--- a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecTest.java
+++ b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecTest.java
@@ -79,7 +79,7 @@ public class HL7MLLPCodecTest extends HL7TestSupport {
         in.append("\n");
         in.append(line2);
 
-        String out = (String)template.requestBody("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec", in.toString());
+        String out = template.requestBody("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec", in.toString(), String.class);
         // END SNIPPET: e2
 
         String[] lines = out.split("\r");

http://git-wip-us.apache.org/repos/asf/camel/blob/402e5a1d/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7RouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7RouteTest.java b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7RouteTest.java
index 8222e87..1364a66 100644
--- a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7RouteTest.java
+++ b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7RouteTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.hl7;
 
+import java.security.SecureRandom;
+import java.util.Random;
+
 import ca.uhn.hl7v2.model.Message;
 import ca.uhn.hl7v2.model.v24.message.ADR_A19;
 import ca.uhn.hl7v2.model.v24.message.ADT_A01;
@@ -63,7 +66,7 @@ public class HL7RouteTest extends HL7TestSupport {
         in.append("\r");
         in.append(line2);
 
-        String out = (String) template.requestBody("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec", in.toString());
+        String out = template.requestBody("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec", in.toString(), String.class);
 
         String[] lines = out.split("\r");
         assertEquals("MSH|^~\\&|MYSENDER||||200701011539||ADR^A19||||123", lines[0]);
@@ -78,7 +81,7 @@ public class HL7RouteTest extends HL7TestSupport {
         mock.expectedMessageCount(1);
         mock.message(0).body().isInstanceOf(Message.class);
 
-        String line1 = "MSH|^~\\&|MYSENDER|MYSENDERAPP|MYCLIENT|MYCLIENTAPP|200612211200||ADT^A01|1234|P|2.4";
+        String line1 = "MSH|^~\\&|MYSENDER|MYSENDERAPP|MYCLIENT|MYCLIENTAPP|200612211200||ADT^A01|123|P|2.4";
         String line2 = "PID|||123456||Doe^John";
 
         StringBuilder in = new StringBuilder();
@@ -86,10 +89,10 @@ public class HL7RouteTest extends HL7TestSupport {
         in.append("\r");
         in.append(line2);
 
-        String out = (String) template.requestBody("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec", in.toString());
+        String out = template.requestBody("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec", in.toString(), String.class);
         String[] lines = out.split("\r");
         assertEquals("MSH|^~\\&|MYSENDER||||200701011539||ADT^A01||||123", lines[0]);
-        assertEquals("PID|||123456||Doe^John", lines[1]);
+        assertEquals("PID|||123||Doe^John", lines[1]);
 
         assertMockEndpointsSatisfied();
     }
@@ -164,7 +167,7 @@ public class HL7RouteTest extends HL7TestSupport {
             // here you can have your business logic for A01 messages
             assertTrue(msg instanceof ADT_A01);
             // just return the same dummy response
-            return createADT01Message();
+            return createADT01Message(((ADT_A01)msg).getMSH().getMessageControlID().getValue());
         }
     }
     // END SNIPPET: e2
@@ -193,7 +196,7 @@ public class HL7RouteTest extends HL7TestSupport {
         return adr.getMessage();
     }
 
-    private static Message createADT01Message() throws Exception {
+    private static Message createADT01Message(String msgId) throws Exception {
         ADT_A01 adt = new ADT_A01();
 
         // Populate the MSH Segment
@@ -210,7 +213,7 @@ public class HL7RouteTest extends HL7TestSupport {
         PID pid = adt.getPID();
         pid.getPatientName(0).getFamilyName().getSurname().setValue("Doe");
         pid.getPatientName(0).getGivenName().setValue("John");
-        pid.getPatientIdentifierList(0).getID().setValue("123456");
+        pid.getPatientIdentifierList(0).getID().setValue(msgId);
 
         return adt;
     }