You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/01/28 09:19:04 UTC

[3/4] camel git commit: Improved Socket cleanup and added support for bindTimeout URI paramter

Improved Socket cleanup and added support for bindTimeout URI paramter


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8f09e4b3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8f09e4b3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8f09e4b3

Branch: refs/heads/master
Commit: 8f09e4b3110c70ec9896fbd051d816c294f914c4
Parents: d3bfca9
Author: Quinn Stevenson <qu...@pronoia-solutions.com>
Authored: Mon Jan 25 15:48:53 2016 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jan 28 09:11:37 2016 +0100

----------------------------------------------------------------------
 components/camel-mllp/pom.xml                   |  13 --
 .../camel/component/mllp/MllpEndpoint.java      |  35 ++++-
 .../component/mllp/MllpTcpClientProducer.java   | 154 ++++++++++---------
 .../component/mllp/MllpTcpServerConsumer.java   |  50 +++++-
 .../camel/component/mllp/impl/MllpUtil.java     |  22 ++-
 .../MllpTcpClientProducerBlueprintTest.java     |   3 +-
 .../MllpTcpServerConsumerBindTimeoutTest.java   | 117 ++++++++++++++
 .../mllp/MllpTcpServerConsumerTest.java         |   2 +
 .../junit/rule/mllp/MllpServerResource.java     | 113 +++++++++++---
 9 files changed, 385 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-mllp/pom.xml b/components/camel-mllp/pom.xml
index e49249b..c2cfa0c 100644
--- a/components/camel-mllp/pom.xml
+++ b/components/camel-mllp/pom.xml
@@ -73,17 +73,4 @@
 
   </dependencies>
 
-  <!-- Ensure that test runs do no leave running JVMs -->
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <shutdown>kill</shutdown>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-
 </project>

http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index e93e408..3a274ef 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -47,8 +47,8 @@ public class MllpEndpoint extends DefaultEndpoint {
 
     private static final Logger LOG = LoggerFactory.getLogger(MllpEndpoint.class);
 
-    @UriPath(defaultValue = "0.0.0.0")
-    String hostname = "0.0.0.0";
+    @UriPath(defaultValue = "null")
+    String hostname;
 
     @UriPath(description = "TCP Port for connection")
     int port = -1;
@@ -56,9 +56,12 @@ public class MllpEndpoint extends DefaultEndpoint {
     @UriParam(defaultValue = "5")
     int backlog = 5;
 
-    @UriParam(defaultValue = "30000", description = "TCP Server only - timeout value while waiting for a TCP listener to start (milliseconds)")
+    @UriParam(defaultValue = "30000")
     int bindTimeout = 30000;
 
+    @UriParam(defaultValue = "5000")
+    int bindRetryInterval = 5000;
+
     @UriParam(defaultValue = "60000")
     int acceptTimeout = 60000;
 
@@ -160,7 +163,9 @@ public class MllpEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Hostname or IP for connection for the TCP connection
+     * Hostname or IP for connection for the TCP connection.
+     *
+     * The default value is null, which means any local IP address
      *
      * @param hostname Hostname or IP
      */
@@ -193,6 +198,28 @@ public class MllpEndpoint extends DefaultEndpoint {
         this.backlog = backlog;
     }
 
+    public int getBindTimeout() {
+        return bindTimeout;
+    }
+
+    /**
+     * TCP Server Only - The number of milliseconds to retry binding to a server port
+     */
+    public void setBindTimeout(int bindTimeout) {
+        this.bindTimeout = bindTimeout;
+    }
+
+    public int getBindRetryInterval() {
+        return bindRetryInterval;
+    }
+
+    /**
+     * TCP Server Only - The number of milliseconds to wait between bind attempts
+     */
+    public void setBindRetryInterval(int bindRetryInterval) {
+        this.bindRetryInterval = bindRetryInterval;
+    }
+
     public int getAcceptTimeout() {
         return acceptTimeout;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
index f95899f..17979c4 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
@@ -105,10 +105,11 @@ public class MllpTcpClientProducer extends DefaultProducer {
         }
 
         log.debug("Reading acknowledgement from external system");
-        byte[] acknowledgementBytes;
+        byte[] acknowledgementBytes = null;
         try {
-            MllpUtil.openFrame(socket);
-            acknowledgementBytes = MllpUtil.closeFrame(socket);
+            if (MllpUtil.openFrame(socket)) {
+                acknowledgementBytes = MllpUtil.closeFrame(socket);
+            }
         } catch (SocketTimeoutException timeoutEx) {
             exchange.setException(new MllpAcknowledgementTimoutException("Acknowledgement timout", timeoutEx));
             return;
@@ -117,82 +118,84 @@ public class MllpTcpClientProducer extends DefaultProducer {
             return;
         }
 
-        log.debug("Populating the exchange with the acknowledgement from the external system");
-        message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
-
-        message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString());
-        message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress());
-
-        // Now, extract the acknowledgement type and check for a NACK
-        byte fieldDelim = acknowledgementBytes[3];
-        // First, find the beginning of the MSA segment - should be the second segment
-        int msaStartIndex = -1;
-        for (int i = 0; i < acknowledgementBytes.length; ++i) {
-            if (SEGMENT_DELIMITER == acknowledgementBytes[i]) {
-                final byte bM = 77;
-                final byte bS = 83;
-                final byte bC = 67;
-                final byte bA = 65;
-                final byte bE = 69;
-                final byte bR = 82;
+        if (null != acknowledgementBytes) {
+            log.debug("Populating the exchange with the acknowledgement from the external system");
+            message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
+
+            message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString());
+            message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress());
+
+            // Now, extract the acknowledgement type and check for a NACK
+            byte fieldDelim = acknowledgementBytes[3];
+            // First, find the beginning of the MSA segment - should be the second segment
+            int msaStartIndex = -1;
+            for (int i = 0; i < acknowledgementBytes.length; ++i) {
+                if (SEGMENT_DELIMITER == acknowledgementBytes[i]) {
+                    final byte bM = 77;
+                    final byte bS = 83;
+                    final byte bC = 67;
+                    final byte bA = 65;
+                    final byte bE = 69;
+                    final byte bR = 82;
                         /* We've found the start of a new segment - make sure peeking ahead
                            won't run off the end of the array - we need at least 7 more bytes
                          */
-                if (acknowledgementBytes.length > i + 7) {
-                    // We can safely peek ahead
-                    if (bM == acknowledgementBytes[i + 1] && bS == acknowledgementBytes[i + 2] && bA == acknowledgementBytes[i + 3] && fieldDelim == acknowledgementBytes[i + 4]) {
-                        // Found the beginning of the MSA - the next two bytes should be our acknowledgement code
-                        msaStartIndex = i + 1;
-                        if (bA != acknowledgementBytes[i + 5]  &&  bC != acknowledgementBytes[i + 5]) {
-                            exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
-                        } else {
-                            String acknowledgemenTypeString;
-                            switch (acknowledgementBytes[i + 6]) {
-                            case bA:
-                                // We have an AA or CA- make sure that's the end of the field
-                                if (fieldDelim != acknowledgementBytes[i + 7]) {
-                                    exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
-                                }
-                                if (bA == acknowledgementBytes[i + 5]) {
-                                    message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
-                                } else {
-                                    message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CA");
-                                }
-                                break;
-                            case bE:
-                                // We have an AE or CE
-                                if (bA == acknowledgementBytes[i + 5]) {
-                                    message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AE");
-                                    exchange.setException(new MllpApplicationErrorAcknowledgementException(new String(acknowledgementBytes)));
-                                } else {
-                                    message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CE");
-                                    exchange.setException(new MllpCommitErrorAcknowledgementException(new String(acknowledgementBytes)));
-                                }
-                                break;
-                            case bR:
-                                // We have an AR or CR
-                                if (bA == acknowledgementBytes[i + 5]) {
-                                    message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AR");
-                                    exchange.setException(new MllpApplicationRejectAcknowledgementException(new String(acknowledgementBytes)));
-                                } else {
-                                    message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CR");
-                                    exchange.setException(new MllpCommitRejectAcknowledgementException(new String(acknowledgementBytes)));
-                                }
-                                break;
-                            default:
+                    if (acknowledgementBytes.length > i + 7) {
+                        // We can safely peek ahead
+                        if (bM == acknowledgementBytes[i + 1] && bS == acknowledgementBytes[i + 2] && bA == acknowledgementBytes[i + 3] && fieldDelim == acknowledgementBytes[i + 4]) {
+                            // Found the beginning of the MSA - the next two bytes should be our acknowledgement code
+                            msaStartIndex = i + 1;
+                            if (bA != acknowledgementBytes[i + 5] && bC != acknowledgementBytes[i + 5]) {
                                 exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+                            } else {
+                                String acknowledgemenTypeString;
+                                switch (acknowledgementBytes[i + 6]) {
+                                    case bA:
+                                        // We have an AA or CA- make sure that's the end of the field
+                                        if (fieldDelim != acknowledgementBytes[i + 7]) {
+                                            exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+                                        }
+                                        if (bA == acknowledgementBytes[i + 5]) {
+                                            message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+                                        } else {
+                                            message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CA");
+                                        }
+                                        break;
+                                    case bE:
+                                        // We have an AE or CE
+                                        if (bA == acknowledgementBytes[i + 5]) {
+                                            message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AE");
+                                            exchange.setException(new MllpApplicationErrorAcknowledgementException(new String(acknowledgementBytes)));
+                                        } else {
+                                            message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CE");
+                                            exchange.setException(new MllpCommitErrorAcknowledgementException(new String(acknowledgementBytes)));
+                                        }
+                                        break;
+                                    case bR:
+                                        // We have an AR or CR
+                                        if (bA == acknowledgementBytes[i + 5]) {
+                                            message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AR");
+                                            exchange.setException(new MllpApplicationRejectAcknowledgementException(new String(acknowledgementBytes)));
+                                        } else {
+                                            message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CR");
+                                            exchange.setException(new MllpCommitRejectAcknowledgementException(new String(acknowledgementBytes)));
+                                        }
+                                        break;
+                                    default:
+                                        exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+                                }
                             }
-                        }
 
-                        break;
+                            break;
+                        }
                     }
                 }
-            }
 
-        }
-        if (-1 == msaStartIndex) {
-            // Didn't find an MSA
-            exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+            }
+            if (-1 == msaStartIndex) {
+                // Didn't find an MSA
+                exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+            }
         }
         // Check AFTER_SEND Properties
         if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) {
@@ -231,10 +234,15 @@ public class MllpTcpClientProducer extends DefaultProducer {
             }
 
 
-            SocketAddress address = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort());
-            log.debug("Connecting to socket on {}", address);
+            InetSocketAddress socketAddress;
+            if (null == endpoint.getHostname()) {
+                socketAddress = new InetSocketAddress(endpoint.getPort());
+            } else {
+                socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort());
+            }
+            log.debug("Connecting to socket on {}", socketAddress);
             try {
-                socket.connect(address, endpoint.connectTimeout);
+                socket.connect(socketAddress, endpoint.connectTimeout);
             } catch (SocketTimeoutException e) {
                 return e;
             } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index 05d13f5..e158dc3 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.mllp;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -93,8 +94,27 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
         // Accept Timeout
         serverSocket.setSoTimeout(endpoint.acceptTimeout);
 
-        InetSocketAddress socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort());
-        serverSocket.bind(socketAddress, endpoint.backlog);
+        InetSocketAddress socketAddress;
+        if (null == endpoint.getHostname()) {
+            socketAddress = new InetSocketAddress(endpoint.getPort());
+        } else {
+            socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort());
+        }
+        long startTicks = System.currentTimeMillis();
+
+        do {
+            try {
+                serverSocket.bind(socketAddress, endpoint.backlog);
+            } catch (BindException bindException) {
+                if (System.currentTimeMillis() > startTicks + endpoint.getBindTimeout()) {
+                    log.error( "Failed to bind to address {} within timeout {}", socketAddress, endpoint.getBindTimeout());
+                    throw bindException;
+                } else {
+                    log.warn( "Failed to bind to address {} - retrying in {} milliseconds", socketAddress, endpoint.getBindRetryInterval());
+                    Thread.sleep(endpoint.getBindRetryInterval());
+                }
+            }
+        } while ( !serverSocket.isBound() );
 
         serverSocketThread = new ServerSocketThread(serverSocket);
         serverSocketThread.start();
@@ -106,6 +126,12 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
     protected void doStop() throws Exception {
         log.debug("doStop()");
 
+        // Close any client sockets that are currently open
+        for (ClientSocketThread clientSocketThread: clientThreads) {
+            clientSocketThread.interrupt();
+        }
+
+
         switch (serverSocketThread.getState()) {
         case TERMINATED:
             // This is what we hope for
@@ -195,7 +221,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
             log.debug("Starting acceptor thread");
 
             try {
-                while (!isInterrupted() && null != serverSocket && serverSocket.isBound() && !serverSocket.isClosed()) {
+                while (!isInterrupted()  &&  null != serverSocket && serverSocket.isBound()  &&  !serverSocket.isClosed()) {
                     // TODO: Need to check maxConnections and figure out what to do when exceeded
                     Socket socket = null;
                     try {
@@ -370,7 +396,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
         @Override
         public void run() {
 
-            while (null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) {
+            while (!isInterrupted()  &&  null != clientSocket  &&  clientSocket.isConnected()  &&  !clientSocket.isClosed()) {
                 byte[] hl7MessageBytes = null;
                 // Send the message on for processing and wait for the response
                 log.debug("Reading data ....");
@@ -379,7 +405,9 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
                         hl7MessageBytes = MllpUtil.closeFrame(clientSocket);
                     } else {
                         try {
-                            MllpUtil.openFrame(clientSocket);
+                            if (!MllpUtil.openFrame(clientSocket)) {
+                                continue;
+                            }
                         } catch (SocketTimeoutException timeoutEx) {
                             // When thrown by openFrame, it indicates that no data was available - but no error
                             continue;
@@ -605,6 +633,18 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
                 }
             }
         }
+
+        @Override
+        public void interrupt() {
+            if (null != clientSocket  &&  clientSocket.isConnected()  && !clientSocket.isClosed()) {
+                try {
+                    clientSocket.close();
+                } catch (IOException ex) {
+                    log.warn("Exception encoutered closing client Socket in interrupt", ex);
+                }
+            }
+            super.interrupt();
+        }
     }
 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
index c1c5aec..72c2514 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
 
 import org.apache.camel.component.mllp.MllpComponent;
@@ -67,25 +68,34 @@ public final class MllpUtil {
      * @throws MllpCorruptFrameException if the MLLP Frame is corrupted in some way
      * @throws MllpException             for other unexpected error conditions
      */
-    public static void openFrame(Socket socket) throws SocketTimeoutException, MllpCorruptFrameException, MllpException {
+    public static boolean openFrame(Socket socket) throws SocketTimeoutException, MllpCorruptFrameException, MllpException {
         if (socket.isConnected() && !socket.isClosed()) {
             InputStream socketInputStream = MllpUtil.getInputStream(socket);
 
-            int readByte;
+            int readByte = -1;
             try {
                 readByte = socketInputStream.read();
                 switch (readByte) {
                 case START_OF_BLOCK:
-                    return;
+                    return true;
                 case END_OF_STREAM:
                     resetConnection(socket);
-                    return;
+                    return false;
                 default:
                     // Continue on and process the out-of-frame data
                 }
             } catch (SocketTimeoutException normaTimeoutEx) {
                 // Just pass this on - the caller will wrap it in a MllpTimeoutException
                 throw normaTimeoutEx;
+            } catch (SocketException socketEx ) {
+                if (socket.isClosed()) {
+                    LOG.debug( "Socket closed while opening MLLP frame - ignoring exception", socketEx);
+                    return false;
+                } else {
+                    LOG.error("Unexpected Exception occurred opening MLLP frame - resetting the connection");
+                    MllpUtil.resetConnection(socket);
+                    throw new MllpException("Unexpected Exception occurred opening MLLP frame", socketEx);
+                }
             } catch (IOException unexpectedException) {
                 LOG.error("Unexpected Exception occurred opening MLLP frame - resetting the connection");
                 MllpUtil.resetConnection(socket);
@@ -152,6 +162,8 @@ public final class MllpUtil {
                 throw new MllpCorruptFrameException("Exception encountered looking for the beginning of the MLLP frame, and out-of-frame data had been read", outOfFrameData.toByteArray());
             }
         }
+
+        return false;
     }
 
     /**
@@ -319,7 +331,7 @@ public final class MllpUtil {
     }
 
     public static void resetConnection(Socket socket) {
-        if (null != socket) {
+        if (null != socket  &&  !socket.isClosed()) {
             try {
                 socket.setSoLinger(true, 0);
             } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
index 4233d80..bb87146 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
@@ -26,6 +26,7 @@ import org.apache.camel.ProducerTemplate;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.DefaultComponentResolver;
 import org.apache.camel.spi.ComponentResolver;
+import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.blueprint.CamelBlueprintTestSupport;
 import org.apache.camel.test.junit.rule.mllp.MllpServerResource;
 import org.apache.camel.util.KeyValueHolder;
@@ -36,7 +37,7 @@ import static org.apache.camel.test.mllp.Hl7MessageGenerator.generateMessage;
 
 public class MllpTcpClientProducerBlueprintTest extends CamelBlueprintTestSupport {
     @Rule
-    public MllpServerResource mllpServer = new MllpServerResource();
+    public MllpServerResource mllpServer = new MllpServerResource(AvailablePortFinder.getNextAvailable());
 
     final String sourceUri = "direct://source";
     final String mockAcknowledgedUri = "mock://acknowledged";

http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerBindTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerBindTimeoutTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerBindTimeoutTest.java
new file mode 100644
index 0000000..3ea892f
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerBindTimeoutTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import java.net.ServerSocket;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit.rule.mllp.MllpClientResource;
+import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceException;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.apache.camel.test.mllp.Hl7MessageGenerator.generateMessage;
+
+public class MllpTcpServerConsumerBindTimeoutTest extends CamelTestSupport {
+    @Rule
+    public MllpClientResource mllpClient = new MllpClientResource();
+
+    @EndpointInject(uri = "mock://result")
+    MockEndpoint result;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        DefaultCamelContext context = (DefaultCamelContext) super.createCamelContext();
+
+        context.setUseMDCLogging(true);
+        context.setName(this.getClass().getSimpleName());
+
+        return context;
+    }
+
+    @Override
+    public boolean isUseAdviceWith() {
+        return true;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+
+        mllpClient.setMllpHost("localhost");
+        mllpClient.setMllpPort(AvailablePortFinder.getNextAvailable());
+
+        return new RouteBuilder() {
+            int connectTimeout = 500;
+            int responseTimeout = 5000;
+
+            @Override
+            public void configure() throws Exception {
+                String routeId = "mllp-test-receiver-route";
+
+                onCompletion()
+                        .toF("log:%s?level=INFO&showAll=true", routeId)
+                        .log(LoggingLevel.INFO, routeId, "Test route complete");
+
+                fromF("mllp://%s:%d?autoAck=true&connectTimeout=%d&receiveTimeout=%d",
+                        mllpClient.getMllpHost(), mllpClient.getMllpPort(), connectTimeout, responseTimeout)
+                        .routeId(routeId)
+                        .log(LoggingLevel.INFO, routeId, "Test route received message")
+                        .to(result);
+
+            }
+        };
+    }
+
+    @Test
+    public void testReceiveSingleMessage() throws Exception {
+        result.expectedMessageCount(1);
+
+        Thread tmpThread = new Thread() {
+            public void run() {
+                try {
+                    ServerSocket tmpSocket = new ServerSocket(mllpClient.getMllpPort());
+                    Thread.sleep(15000);
+                    tmpSocket.close();
+                } catch (Exception ex) {
+                    throw new RuntimeException( "Exception caught in dummy listener", ex);
+                }
+            }
+
+        };
+
+        tmpThread.start();
+
+        context.start();
+
+        mllpClient.connect();
+
+        mllpClient.sendMessageAndWaitForAcknowledgement(generateMessage(), 10000);
+
+        assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java
index c77c35f..eb6a463 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.mllp;
 
+import java.net.BindException;
+import java.net.ServerSocket;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;

http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java b/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
index c148310..8215d1b 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
@@ -24,6 +24,8 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.regex.Pattern;
 
 import org.junit.rules.ExternalResource;
@@ -436,11 +438,11 @@ public class MllpServerResource extends ExternalResource {
     }
 
     void resetConnection(Socket socket) {
-        if (null != socket) {
+        if (null != socket  && !socket.isClosed()) {
             try {
                 socket.setSoLinger(true, 0);
-            } catch (Exception ex) {
-                log.warn("Exception encountered setting SO_LINGER to 0 on the socket to force a reset", ex);
+            } catch (SocketException socketEx) {
+                log.debug("SocketException encountered setting SO_LINGER to 0 on the socket to force a reset - ignoring", socketEx);
             } finally {
                 closeConnection(socket);
             }
@@ -455,6 +457,7 @@ public class MllpServerResource extends ExternalResource {
         Logger log = LoggerFactory.getLogger(this.getClass());
 
         ServerSocket serverSocket;
+        List<ClientSocketThread> clientSocketThreads = new LinkedList<>();
 
         String listenHost = "0.0.0.0";
         int listenPort;
@@ -519,30 +522,47 @@ public class MllpServerResource extends ExternalResource {
         public void run() {
             log.info("Accepting connections on port {}", serverSocket.getLocalPort());
             this.setName("MllpServerResource$ServerSocketThread - " + serverSocket.getLocalSocketAddress().toString());
-            while (isActive() && serverSocket.isBound()) {
+            while (!isInterrupted()  &&  serverSocket.isBound()  && !serverSocket.isClosed()) {
                 Socket clientSocket = null;
                 try {
                     clientSocket = serverSocket.accept();
-                    clientSocket.setKeepAlive(true);
-                    clientSocket.setTcpNoDelay(false);
-                    clientSocket.setSoLinger(false, -1);
-                    clientSocket.setSoTimeout(5000);
-                    ClientSocketThread clientSocketThread = new ClientSocketThread(clientSocket);
-                    clientSocketThread.setDaemon(true);
-                    clientSocketThread.start();
                 } catch (SocketTimeoutException timeoutEx) {
                     if (raiseExceptionOnAcceptTimeout) {
                         throw new MllpJUnitResourceTimeoutException("Timeout Accepting client connection", timeoutEx);
                     }
-                    continue;
-                } catch (IOException e) {
-                    log.warn("IOException creating Client Socket");
+                    log.warn("Timeout waiting for client connection");
+                } catch (SocketException socketEx) {
+                    log.debug("SocketException encountered accepting client connection - ignoring", socketEx);
+                    if (null == clientSocket) {
+                        continue;
+                    } else if (!clientSocket.isClosed()) {
+                        resetConnection(clientSocket);
+                        continue;
+                    } else {
+                        throw new MllpJUnitResourceException("Unexpected SocketException encountered accepting client connection", socketEx);
+                    }
+                } catch (Exception ex) {
+                    throw new MllpJUnitResourceException("Unexpected exception encountered accepting client connection", ex);
+                }
+                if (null != clientSocket) {
                     try {
-                        clientSocket.close();
-                    } catch (IOException e1) {
-                        log.warn("Exceptiong encountered closing client socket after attempting to accept connection");
+                        clientSocket.setKeepAlive(true);
+                        clientSocket.setTcpNoDelay(false);
+                        clientSocket.setSoLinger(false, -1);
+                        clientSocket.setSoTimeout(5000);
+                        ClientSocketThread clientSocketThread = new ClientSocketThread(clientSocket);
+                        clientSocketThread.setDaemon(true);
+                        clientSocketThread.start();
+                        clientSocketThreads.add(clientSocketThread);
+                    } catch (Exception unexpectedEx) {
+                        log.warn("Unexpected exception encountered configuring client socket");
+                            try {
+                                clientSocket.close();
+                            } catch (IOException ingoreEx) {
+                                log.warn("Exceptiong encountered closing client socket after attempting to accept connection", ingoreEx);
+                            }
+                        throw new MllpJUnitResourceException("Unexpected exception encountered configuring client socket", unexpectedEx);
                     }
-                    throw new MllpJUnitResourceException("IOException creating Socket", e);
                 }
             }
             log.info("No longer accepting connections - closing TCP Listener on port {}", serverSocket.getLocalPort());
@@ -599,6 +619,22 @@ public class MllpServerResource extends ExternalResource {
         public void setRaiseExceptionOnAcceptTimeout(boolean raiseExceptionOnAcceptTimeout) {
             this.raiseExceptionOnAcceptTimeout = raiseExceptionOnAcceptTimeout;
         }
+
+        @Override
+        public void interrupt() {
+            for (ClientSocketThread clientSocketThread: clientSocketThreads) {
+                clientSocketThread.interrupt();
+            }
+
+            if (serverSocket != null  &&  serverSocket.isBound()  &&  !serverSocket.isClosed()) {
+                try {
+                    serverSocket.close();
+                } catch (Exception ex) {
+                    log.warn("Exception encountered closing server socket on interrupt", ex);
+                }
+            }
+            super.interrupt();
+        }
     }
 
     /**
@@ -638,8 +674,20 @@ public class MllpServerResource extends ExternalResource {
             log.info("Handling Connection: {} -> {}", localAddress, remoteAddress);
 
             try {
-                while (null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) {
-                    InputStream instream = clientSocket.getInputStream();
+                while (!isInterrupted()  &&  null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) {
+                    InputStream instream;
+                    try {
+                        instream = clientSocket.getInputStream();
+                    } catch (IOException ioEx) {
+                        if (clientSocket.isClosed()) {
+                            log.debug( "Client socket was closed - ignoring exception", clientSocket);
+                            break;
+                        } else {
+                            throw new MllpJUnitResourceException( "Unexpected IOException encounted getting input stream", ioEx);
+                        }
+                    } catch (Exception unexpectedEx) {
+                        throw new MllpJUnitResourceException( "Unexpected exception encounted getting input stream", unexpectedEx);
+                    }
                     String parsedHL7Message = getMessage(instream);
 
                     if (null != parsedHL7Message && parsedHL7Message.length() > 0) {
@@ -721,7 +769,7 @@ public class MllpServerResource extends ExternalResource {
                 }
             }
 
-            log.info("Connection Finished: {} -> {}", localAddress, remoteAddress);
+            log.debug("Client Connection Finished: {} -> {}", localAddress, remoteAddress);
         }
 
         /**
@@ -749,8 +797,15 @@ public class MllpServerResource extends ExternalResource {
                     }
                 }
             } catch (SocketException socketEx) {
-                log.error("Unable to read from socket stream when expected bMLLP_ENVELOPE_START_OF_BLOCK - resetting connection ", socketEx);
-                resetConnection(clientSocket);
+                if (clientSocket.isClosed()) {
+                    log.info("Client socket closed while waiting for MLLP_ENVELOPE_START_OF_BLOCK");
+                } else if ( clientSocket.isConnected() ) {
+                    log.info( "SocketException encountered while waiting for MLLP_ENVELOPE_START_OF_BLOCK");
+                    resetConnection(clientSocket);
+                } else {
+                    log.error("Unable to read from socket stream when expected bMLLP_ENVELOPE_START_OF_BLOCK - resetting connection ", socketEx);
+                    resetConnection(clientSocket);
+                }
                 return null;
             }
 
@@ -872,6 +927,18 @@ public class MllpServerResource extends ExternalResource {
 
             return null;
         }
+
+        @Override
+        public void interrupt() {
+            if (null != clientSocket  &&  clientSocket.isConnected()  &&  !clientSocket.isClosed()) {
+                try {
+                    clientSocket.close();
+                } catch (Exception ex) {
+                    log.warn("Exception encountered closing client socket on interrput", ex);
+                }
+            }
+            super.interrupt();
+        }
     }