You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/20 16:12:37 UTC

[3/3] camel git commit: Make MLLP Decoder more robust when requests are flooding and more flexible on charset errors

Make MLLP Decoder more robust when requests are flooding and more flexible on charset errors


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8fa474fc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8fa474fc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8fa474fc

Branch: refs/heads/master
Commit: 8fa474fc952ae3b5c469c51c74ad163661f83bce
Parents: 901b44e
Author: christian ohr <ch...@gmail.com>
Authored: Thu Mar 16 14:41:10 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 20 17:11:46 2017 +0100

----------------------------------------------------------------------
 .../camel/component/hl7/HL7MLLPConfig.java      |  21 +++
 .../camel/component/hl7/HL7MLLPDecoder.java     | 135 ++++++++++--------
 .../hl7/HL7MLLPCodecMessageFloodingTest.java    | 138 +++++++++++++++++++
 3 files changed, 236 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8fa474fc/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java
index 2d5abb2..5f580a7 100644
--- a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java
+++ b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.hl7;
 
 import java.nio.charset.Charset;
+import java.nio.charset.CodingErrorAction;
 
 import ca.uhn.hl7v2.DefaultHapiContext;
 import ca.uhn.hl7v2.HapiContext;
@@ -41,6 +42,10 @@ public class HL7MLLPConfig {
 
     private boolean produceString = true;
 
+	private CodingErrorAction malformedInputErrorAction = CodingErrorAction.REPORT;
+	
+    private CodingErrorAction unmappableCharacterErrorAction = CodingErrorAction.REPORT;
+	
     public Charset getCharset() {
         return charset;
     }
@@ -113,4 +118,20 @@ public class HL7MLLPConfig {
     public void setProduceString(boolean produceString) {
         this.produceString = produceString;
     }
+	
+    public CodingErrorAction getMalformedInputErrorAction() {
+        return malformedInputErrorAction;
+    }
+
+	public void setMalformedInputErrorAction(CodingErrorAction malformedInputErrorAction) {
+        this.malformedInputErrorAction = malformedInputErrorAction;
+    }
+	
+    public CodingErrorAction getUnmappableCharacterErrorAction() {
+        return unmappableCharacterErrorAction;
+    }
+
+    public void setUnmappableCharacterErrorAction(CodingErrorAction unmappableCharacterErrorAction) {
+        this.unmappableCharacterErrorAction = unmappableCharacterErrorAction;
+    }	
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8fa474fc/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java
index 4a54d47..2866aca 100644
--- a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java
+++ b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java
@@ -42,76 +42,89 @@ class HL7MLLPDecoder extends CumulativeProtocolDecoder {
         this.config = config;
     }
 
-
     @Override
-    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) {
+    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
 
         // Get the state of the current message and
-        // Skip what we have already scanned
+        // Skip what we have already scanned before
         DecoderState state = decoderState(session);
         in.position(state.current());
 
+        LOG.debug("Received data, checking from position {} to {}", in.position(), in.limit());
+        boolean messageDecoded = false;
+
         while (in.hasRemaining()) {
+
+            int previousPosition = in.position();
             byte current = in.get();
 
-            // If it is the start byte and mark the position
-            if (current == config.getStartByte()) {
-                state.markStart(in.position() - 1);
-            }
-            // If it is the end bytes, extract the payload and return
-            if (state.previous() == config.getEndByte1() && current == config.getEndByte2()) {
-
-                // Remember the current position and limit.
-                int position = in.position();
-                int limit = in.limit();
-                LOG.debug("Message ends at position {} with length {}",
-                        position, position - state.start());
-                try {
+            // Check if we are at the end of an HL7 message
+            if (current == config.getEndByte2() && state.previous() == config.getEndByte1()) {
+                if (state.isStarted()) {
+                    // Save the current buffer pointers and reset them to surround the identifier message
+                    int currentPosition = in.position();
+                    int currentLimit = in.limit();
+                    LOG.debug("Message ends at position {} with length {}", previousPosition, previousPosition - state.start() + 1);
                     in.position(state.start());
-                    in.limit(position);
-                    // The bytes between in.position() and in.limit()
-                    // now contain a full MLLP message including the
-                    // start and end bytes.
-                    out.write(config.isProduceString()
-                            ? parseMessageToString(in.slice(), charsetDecoder(session))
-                            : parseMessageToByteArray(in.slice()));
-                } catch (CharacterCodingException cce) {
-                    throw new IllegalArgumentException("Exception while finalizing the message", cce);
-                } finally {
-                    // Reset position, limit, and state
-                    in.limit(limit);
-                    in.position(position);
-                    state.reset();
+                    in.limit(currentPosition);
+                    LOG.debug("Set start to position {} and limit to {}", in.position(), in.limit());
+
+                    // Now create string or byte[] from this part of the buffer and restore the buffer pointers
+                    try {
+                        out.write(config.isProduceString()
+                                ? parseMessageToString(in.slice(), charsetDecoder(session))
+                                : parseMessageToByteArray(in.slice()));
+                        messageDecoded = true;
+                    } finally {
+                        LOG.debug("Resetting to position {} and limit to {}", currentPosition, currentLimit);
+                        in.position(currentPosition);
+                        in.limit(currentLimit);
+                        state.reset();
+                    }
+                } else {
+                    LOG.warn("Ignoring message end at position {} until start byte has been seen.", previousPosition);
+                }
+            } else {
+                // Check if we are at the start of an HL7 message
+                if (current == config.getStartByte()) {
+                    state.markStart(previousPosition);
+                } else {
+                    // Remember previous byte in state object because the buffer could
+                    // be theoretically exhausted right between the two end bytes
+                    state.markPrevious(current);
                 }
-                return true;
+                messageDecoded = false;
             }
-            // Remember previous byte in state object because the buffer could
-            // be theoretically exhausted right between the two end bytes
-            state.markPrevious(current);
         }
 
-        // Could not find a complete message in the buffer.
-        // Reset to the initial position and return false so that this method
-        // is called again with more data.
-        LOG.debug("No complete message yet at position {} ", in.position());
-        state.markCurrent(in.position());
-        in.position(0);
-        return false;
+        if (!messageDecoded) {
+            // Could not find a complete message in the buffer.
+            // Reset to the initial position (just as nothing had been read yet)
+            // and return false so that this method is called again with more data.
+            LOG.debug("No complete message yet at position {} ", in.position());
+            state.markCurrent(in.position());
+            in.position(0);
+        }
+        return messageDecoded;
     }
 
     // Make a defensive byte copy (the buffer will be reused)
     // and omit the start and the two end bytes of the MLLP message
     // returning a byte array
-    private Object parseMessageToByteArray(IoBuffer slice) throws CharacterCodingException {
-        byte[] dst = new byte[slice.limit() - 3];
-        slice.skip(1); // skip start byte
-        slice.get(dst, 0, dst.length);
+    private Object parseMessageToByteArray(IoBuffer buf) throws CharacterCodingException {
+        int len = buf.limit() - 3;
+        LOG.debug("Making byte array of length {}", len);
+        byte[] dst = new byte[len];
+        buf.skip(1); // skip start byte
+        buf.get(dst, 0, len);
+        buf.skip(2); // skip end bytes
 
         // Only do this if conversion is enabled
         if (config.isConvertLFtoCR()) {
+            LOG.debug("Replacing LF by CR");
             for (int i = 0; i < dst.length; i++) {
-                if (dst[i] == (byte)'\n') {
-                    dst[i] = (byte)'\r';
+                if (dst[i] == (byte) '\n') {
+                    dst[i] = (byte) '\r';
                 }
             }
         }
@@ -121,12 +134,16 @@ class HL7MLLPDecoder extends CumulativeProtocolDecoder {
     // Make a defensive byte copy (the buffer will be reused)
     // and omit the start and the two end bytes of the MLLP message
     // returning a String
-    private Object parseMessageToString(IoBuffer slice, CharsetDecoder decoder) throws CharacterCodingException {
-        slice.skip(1); // skip start byte
-        String message = slice.getString(slice.limit() - 3, decoder);
+    private Object parseMessageToString(IoBuffer buf, CharsetDecoder decoder) throws CharacterCodingException {
+        int len = buf.limit() - 3;
+        LOG.debug("Making string of length {} using charset {}", len, decoder.charset());
+        buf.skip(1); // skip start byte
+        String message = buf.getString(len, decoder);
+        buf.skip(2); // skip end bytes
 
         // Only do this if conversion is enabled
         if (config.isConvertLFtoCR()) {
+            LOG.debug("Replacing LF by CR");
             message = message.replace('\n', '\r');
         }
         return message;
@@ -142,7 +159,9 @@ class HL7MLLPDecoder extends CumulativeProtocolDecoder {
         synchronized (session) {
             CharsetDecoder decoder = (CharsetDecoder) session.getAttribute(CHARSET_DECODER);
             if (decoder == null) {
-                decoder = config.getCharset().newDecoder();
+                decoder = config.getCharset().newDecoder()
+                    .onMalformedInput(config.getMalformedInputErrorAction())
+                    .onUnmappableCharacter(config.getUnmappableCharacterErrorAction());
                 session.setAttribute(CHARSET_DECODER, decoder);
             }
             return decoder;
@@ -164,25 +183,22 @@ class HL7MLLPDecoder extends CumulativeProtocolDecoder {
      * Holds the state of the decoding process
      */
     private static class DecoderState {
-        private int startPos;
+        private int startPos = -1;
         private int currentPos;
         private byte previousByte;
-        private boolean started;
 
         void reset() {
-            startPos = 0;
+            startPos = -1;
             currentPos = 0;
-            started = false;
             previousByte = 0;
         }
 
         void markStart(int position) {
-            if (started) {
+            if (isStarted()) {
                 LOG.warn("Ignoring message start at position {} before previous message has ended.", position);
             } else {
                 startPos = position;
                 LOG.debug("Message starts at position {}", startPos);
-                started = true;
             }
         }
 
@@ -205,6 +221,9 @@ class HL7MLLPDecoder extends CumulativeProtocolDecoder {
         public byte previous() {
             return previousByte;
         }
-    }
 
+        public boolean isStarted() {
+            return startPos >= 0;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8fa474fc/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
new file mode 100644
index 0000000..17133cc
--- /dev/null
+++ b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hl7;
+
+import ca.uhn.hl7v2.model.Message;
+import ca.uhn.hl7v2.model.v24.message.ADR_A19;
+import ca.uhn.hl7v2.model.v24.segment.MSA;
+import ca.uhn.hl7v2.model.v24.segment.MSH;
+import ca.uhn.hl7v2.model.v24.segment.QRD;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Unit test for the HL7MLLP Codec.
+ */
+public class HL7MLLPCodecMessageFloodingTest extends HL7TestSupport {
+    
+
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        HL7MLLPCodec codec = new HL7MLLPCodec();
+        codec.setCharset("ISO-8859-1");
+        codec.setConvertLFtoCR(false);
+        jndi.bind("hl7codec", codec);
+        return jndi;
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec")
+                    .unmarshal().hl7()
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            Message input = exchange.getIn().getBody(Message.class);
+                            Message response = input.generateACK();
+                            exchange.getOut().setBody(response);
+                            Thread.sleep(50); // simulate some processing time
+                        }
+                    })
+                    .to("mock:result");
+            }
+        };
+    }
+
+    @Test
+    public void testHL7MessageFlood() throws Exception {
+
+        // Write and receive using plain sockets and in different threads
+        Socket socket = new Socket("localhost", getPort());
+        BufferedOutputStream outputStream = new BufferedOutputStream(new DataOutputStream(socket.getOutputStream()));
+        final BufferedInputStream inputStream = new BufferedInputStream(new DataInputStream(socket.getInputStream()));
+
+        int messageCount = 100;
+        CountDownLatch latch = new CountDownLatch(messageCount);
+
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                int response;
+                StringBuilder s = new StringBuilder();
+                try {
+                    int i = 0;
+                    boolean cont = true;
+                    while (cont && (response = inputStream.read()) >= 0) {
+                        if (response == 28) {
+                            response = inputStream.read(); // read second end byte
+                            if (response == 13) {
+                                // Responses must arrive in same order
+                                cont = s.toString().contains(String.format("X%dX", i++));
+                                s.setLength(0);
+                                latch.countDown();
+                            }
+                        } else {
+                            s.append((char) response);
+                        }
+                    }
+                } catch (IOException ignored) {
+                }
+            }
+        });
+        t.start();
+
+        String in = "MSH|^~\\&|MYSENDER|MYRECEIVER|MYAPPLICATION||200612211200||QRY^A19|X%dX|P|2.4\r" +
+                "QRD|200612211200|R|I|GetPatient|||1^RD|0101701234|DEM||";
+        for (int i = 0; i < messageCount; i++) {
+            String msg = String.format(in, i);
+            outputStream.write(11);
+            outputStream.flush();
+            // Some systems send end bytes in a separate frame
+            // Thread.sleep(10);
+            outputStream.write(msg.getBytes());
+            outputStream.flush();
+            // Some systems send end bytes in a separate frame
+            // Thread.sleep(10);
+            outputStream.write(28);
+            outputStream.write(13);
+            outputStream.flush();
+            // Potentially wait after message
+            // Thread.sleep(10);
+        }
+
+        boolean success = latch.await(20, TimeUnit.SECONDS);
+
+        outputStream.close();
+        inputStream.close();
+        socket.close();
+
+        assertTrue(success);
+    }
+
+}