You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/01/28 09:19:04 UTC
[3/4] camel git commit: Improved Socket cleanup and added support for
bindTimeout URI paramter
Improved Socket cleanup and added support for bindTimeout URI paramter
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8f09e4b3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8f09e4b3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8f09e4b3
Branch: refs/heads/master
Commit: 8f09e4b3110c70ec9896fbd051d816c294f914c4
Parents: d3bfca9
Author: Quinn Stevenson <qu...@pronoia-solutions.com>
Authored: Mon Jan 25 15:48:53 2016 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jan 28 09:11:37 2016 +0100
----------------------------------------------------------------------
components/camel-mllp/pom.xml | 13 --
.../camel/component/mllp/MllpEndpoint.java | 35 ++++-
.../component/mllp/MllpTcpClientProducer.java | 154 ++++++++++---------
.../component/mllp/MllpTcpServerConsumer.java | 50 +++++-
.../camel/component/mllp/impl/MllpUtil.java | 22 ++-
.../MllpTcpClientProducerBlueprintTest.java | 3 +-
.../MllpTcpServerConsumerBindTimeoutTest.java | 117 ++++++++++++++
.../mllp/MllpTcpServerConsumerTest.java | 2 +
.../junit/rule/mllp/MllpServerResource.java | 113 +++++++++++---
9 files changed, 385 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-mllp/pom.xml b/components/camel-mllp/pom.xml
index e49249b..c2cfa0c 100644
--- a/components/camel-mllp/pom.xml
+++ b/components/camel-mllp/pom.xml
@@ -73,17 +73,4 @@
</dependencies>
- <!-- Ensure that test runs do no leave running JVMs -->
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <shutdown>kill</shutdown>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index e93e408..3a274ef 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -47,8 +47,8 @@ public class MllpEndpoint extends DefaultEndpoint {
private static final Logger LOG = LoggerFactory.getLogger(MllpEndpoint.class);
- @UriPath(defaultValue = "0.0.0.0")
- String hostname = "0.0.0.0";
+ @UriPath(defaultValue = "null")
+ String hostname;
@UriPath(description = "TCP Port for connection")
int port = -1;
@@ -56,9 +56,12 @@ public class MllpEndpoint extends DefaultEndpoint {
@UriParam(defaultValue = "5")
int backlog = 5;
- @UriParam(defaultValue = "30000", description = "TCP Server only - timeout value while waiting for a TCP listener to start (milliseconds)")
+ @UriParam(defaultValue = "30000")
int bindTimeout = 30000;
+ @UriParam(defaultValue = "5000")
+ int bindRetryInterval = 5000;
+
@UriParam(defaultValue = "60000")
int acceptTimeout = 60000;
@@ -160,7 +163,9 @@ public class MllpEndpoint extends DefaultEndpoint {
}
/**
- * Hostname or IP for connection for the TCP connection
+ * Hostname or IP for connection for the TCP connection.
+ *
+ * The default value is null, which means any local IP address
*
* @param hostname Hostname or IP
*/
@@ -193,6 +198,28 @@ public class MllpEndpoint extends DefaultEndpoint {
this.backlog = backlog;
}
+ public int getBindTimeout() {
+ return bindTimeout;
+ }
+
+ /**
+ * TCP Server Only - The number of milliseconds to retry binding to a server port
+ */
+ public void setBindTimeout(int bindTimeout) {
+ this.bindTimeout = bindTimeout;
+ }
+
+ public int getBindRetryInterval() {
+ return bindRetryInterval;
+ }
+
+ /**
+ * TCP Server Only - The number of milliseconds to wait between bind attempts
+ */
+ public void setBindRetryInterval(int bindRetryInterval) {
+ this.bindRetryInterval = bindRetryInterval;
+ }
+
public int getAcceptTimeout() {
return acceptTimeout;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
index f95899f..17979c4 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
@@ -105,10 +105,11 @@ public class MllpTcpClientProducer extends DefaultProducer {
}
log.debug("Reading acknowledgement from external system");
- byte[] acknowledgementBytes;
+ byte[] acknowledgementBytes = null;
try {
- MllpUtil.openFrame(socket);
- acknowledgementBytes = MllpUtil.closeFrame(socket);
+ if (MllpUtil.openFrame(socket)) {
+ acknowledgementBytes = MllpUtil.closeFrame(socket);
+ }
} catch (SocketTimeoutException timeoutEx) {
exchange.setException(new MllpAcknowledgementTimoutException("Acknowledgement timout", timeoutEx));
return;
@@ -117,82 +118,84 @@ public class MllpTcpClientProducer extends DefaultProducer {
return;
}
- log.debug("Populating the exchange with the acknowledgement from the external system");
- message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
-
- message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString());
- message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress());
-
- // Now, extract the acknowledgement type and check for a NACK
- byte fieldDelim = acknowledgementBytes[3];
- // First, find the beginning of the MSA segment - should be the second segment
- int msaStartIndex = -1;
- for (int i = 0; i < acknowledgementBytes.length; ++i) {
- if (SEGMENT_DELIMITER == acknowledgementBytes[i]) {
- final byte bM = 77;
- final byte bS = 83;
- final byte bC = 67;
- final byte bA = 65;
- final byte bE = 69;
- final byte bR = 82;
+ if (null != acknowledgementBytes) {
+ log.debug("Populating the exchange with the acknowledgement from the external system");
+ message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
+
+ message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString());
+ message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress());
+
+ // Now, extract the acknowledgement type and check for a NACK
+ byte fieldDelim = acknowledgementBytes[3];
+ // First, find the beginning of the MSA segment - should be the second segment
+ int msaStartIndex = -1;
+ for (int i = 0; i < acknowledgementBytes.length; ++i) {
+ if (SEGMENT_DELIMITER == acknowledgementBytes[i]) {
+ final byte bM = 77;
+ final byte bS = 83;
+ final byte bC = 67;
+ final byte bA = 65;
+ final byte bE = 69;
+ final byte bR = 82;
/* We've found the start of a new segment - make sure peeking ahead
won't run off the end of the array - we need at least 7 more bytes
*/
- if (acknowledgementBytes.length > i + 7) {
- // We can safely peek ahead
- if (bM == acknowledgementBytes[i + 1] && bS == acknowledgementBytes[i + 2] && bA == acknowledgementBytes[i + 3] && fieldDelim == acknowledgementBytes[i + 4]) {
- // Found the beginning of the MSA - the next two bytes should be our acknowledgement code
- msaStartIndex = i + 1;
- if (bA != acknowledgementBytes[i + 5] && bC != acknowledgementBytes[i + 5]) {
- exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
- } else {
- String acknowledgemenTypeString;
- switch (acknowledgementBytes[i + 6]) {
- case bA:
- // We have an AA or CA- make sure that's the end of the field
- if (fieldDelim != acknowledgementBytes[i + 7]) {
- exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
- }
- if (bA == acknowledgementBytes[i + 5]) {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
- } else {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CA");
- }
- break;
- case bE:
- // We have an AE or CE
- if (bA == acknowledgementBytes[i + 5]) {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AE");
- exchange.setException(new MllpApplicationErrorAcknowledgementException(new String(acknowledgementBytes)));
- } else {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CE");
- exchange.setException(new MllpCommitErrorAcknowledgementException(new String(acknowledgementBytes)));
- }
- break;
- case bR:
- // We have an AR or CR
- if (bA == acknowledgementBytes[i + 5]) {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AR");
- exchange.setException(new MllpApplicationRejectAcknowledgementException(new String(acknowledgementBytes)));
- } else {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CR");
- exchange.setException(new MllpCommitRejectAcknowledgementException(new String(acknowledgementBytes)));
- }
- break;
- default:
+ if (acknowledgementBytes.length > i + 7) {
+ // We can safely peek ahead
+ if (bM == acknowledgementBytes[i + 1] && bS == acknowledgementBytes[i + 2] && bA == acknowledgementBytes[i + 3] && fieldDelim == acknowledgementBytes[i + 4]) {
+ // Found the beginning of the MSA - the next two bytes should be our acknowledgement code
+ msaStartIndex = i + 1;
+ if (bA != acknowledgementBytes[i + 5] && bC != acknowledgementBytes[i + 5]) {
exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+ } else {
+ String acknowledgemenTypeString;
+ switch (acknowledgementBytes[i + 6]) {
+ case bA:
+ // We have an AA or CA- make sure that's the end of the field
+ if (fieldDelim != acknowledgementBytes[i + 7]) {
+ exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+ }
+ if (bA == acknowledgementBytes[i + 5]) {
+ message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+ } else {
+ message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CA");
+ }
+ break;
+ case bE:
+ // We have an AE or CE
+ if (bA == acknowledgementBytes[i + 5]) {
+ message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AE");
+ exchange.setException(new MllpApplicationErrorAcknowledgementException(new String(acknowledgementBytes)));
+ } else {
+ message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CE");
+ exchange.setException(new MllpCommitErrorAcknowledgementException(new String(acknowledgementBytes)));
+ }
+ break;
+ case bR:
+ // We have an AR or CR
+ if (bA == acknowledgementBytes[i + 5]) {
+ message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AR");
+ exchange.setException(new MllpApplicationRejectAcknowledgementException(new String(acknowledgementBytes)));
+ } else {
+ message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CR");
+ exchange.setException(new MllpCommitRejectAcknowledgementException(new String(acknowledgementBytes)));
+ }
+ break;
+ default:
+ exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+ }
}
- }
- break;
+ break;
+ }
}
}
- }
- }
- if (-1 == msaStartIndex) {
- // Didn't find an MSA
- exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+ }
+ if (-1 == msaStartIndex) {
+ // Didn't find an MSA
+ exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+ }
}
// Check AFTER_SEND Properties
if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) {
@@ -231,10 +234,15 @@ public class MllpTcpClientProducer extends DefaultProducer {
}
- SocketAddress address = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort());
- log.debug("Connecting to socket on {}", address);
+ InetSocketAddress socketAddress;
+ if (null == endpoint.getHostname()) {
+ socketAddress = new InetSocketAddress(endpoint.getPort());
+ } else {
+ socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort());
+ }
+ log.debug("Connecting to socket on {}", socketAddress);
try {
- socket.connect(address, endpoint.connectTimeout);
+ socket.connect(socketAddress, endpoint.connectTimeout);
} catch (SocketTimeoutException e) {
return e;
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index 05d13f5..e158dc3 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.mllp;
import java.io.IOException;
import java.io.InputStream;
+import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -93,8 +94,27 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
// Accept Timeout
serverSocket.setSoTimeout(endpoint.acceptTimeout);
- InetSocketAddress socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort());
- serverSocket.bind(socketAddress, endpoint.backlog);
+ InetSocketAddress socketAddress;
+ if (null == endpoint.getHostname()) {
+ socketAddress = new InetSocketAddress(endpoint.getPort());
+ } else {
+ socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort());
+ }
+ long startTicks = System.currentTimeMillis();
+
+ do {
+ try {
+ serverSocket.bind(socketAddress, endpoint.backlog);
+ } catch (BindException bindException) {
+ if (System.currentTimeMillis() > startTicks + endpoint.getBindTimeout()) {
+ log.error( "Failed to bind to address {} within timeout {}", socketAddress, endpoint.getBindTimeout());
+ throw bindException;
+ } else {
+ log.warn( "Failed to bind to address {} - retrying in {} milliseconds", socketAddress, endpoint.getBindRetryInterval());
+ Thread.sleep(endpoint.getBindRetryInterval());
+ }
+ }
+ } while ( !serverSocket.isBound() );
serverSocketThread = new ServerSocketThread(serverSocket);
serverSocketThread.start();
@@ -106,6 +126,12 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
protected void doStop() throws Exception {
log.debug("doStop()");
+ // Close any client sockets that are currently open
+ for (ClientSocketThread clientSocketThread: clientThreads) {
+ clientSocketThread.interrupt();
+ }
+
+
switch (serverSocketThread.getState()) {
case TERMINATED:
// This is what we hope for
@@ -195,7 +221,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
log.debug("Starting acceptor thread");
try {
- while (!isInterrupted() && null != serverSocket && serverSocket.isBound() && !serverSocket.isClosed()) {
+ while (!isInterrupted() && null != serverSocket && serverSocket.isBound() && !serverSocket.isClosed()) {
// TODO: Need to check maxConnections and figure out what to do when exceeded
Socket socket = null;
try {
@@ -370,7 +396,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
@Override
public void run() {
- while (null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) {
+ while (!isInterrupted() && null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) {
byte[] hl7MessageBytes = null;
// Send the message on for processing and wait for the response
log.debug("Reading data ....");
@@ -379,7 +405,9 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
hl7MessageBytes = MllpUtil.closeFrame(clientSocket);
} else {
try {
- MllpUtil.openFrame(clientSocket);
+ if (!MllpUtil.openFrame(clientSocket)) {
+ continue;
+ }
} catch (SocketTimeoutException timeoutEx) {
// When thrown by openFrame, it indicates that no data was available - but no error
continue;
@@ -605,6 +633,18 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
}
}
}
+
+ @Override
+ public void interrupt() {
+ if (null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) {
+ try {
+ clientSocket.close();
+ } catch (IOException ex) {
+ log.warn("Exception encoutered closing client Socket in interrupt", ex);
+ }
+ }
+ super.interrupt();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
index c1c5aec..72c2514 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
+import java.net.SocketException;
import java.net.SocketTimeoutException;
import org.apache.camel.component.mllp.MllpComponent;
@@ -67,25 +68,34 @@ public final class MllpUtil {
* @throws MllpCorruptFrameException if the MLLP Frame is corrupted in some way
* @throws MllpException for other unexpected error conditions
*/
- public static void openFrame(Socket socket) throws SocketTimeoutException, MllpCorruptFrameException, MllpException {
+ public static boolean openFrame(Socket socket) throws SocketTimeoutException, MllpCorruptFrameException, MllpException {
if (socket.isConnected() && !socket.isClosed()) {
InputStream socketInputStream = MllpUtil.getInputStream(socket);
- int readByte;
+ int readByte = -1;
try {
readByte = socketInputStream.read();
switch (readByte) {
case START_OF_BLOCK:
- return;
+ return true;
case END_OF_STREAM:
resetConnection(socket);
- return;
+ return false;
default:
// Continue on and process the out-of-frame data
}
} catch (SocketTimeoutException normaTimeoutEx) {
// Just pass this on - the caller will wrap it in a MllpTimeoutException
throw normaTimeoutEx;
+ } catch (SocketException socketEx ) {
+ if (socket.isClosed()) {
+ LOG.debug( "Socket closed while opening MLLP frame - ignoring exception", socketEx);
+ return false;
+ } else {
+ LOG.error("Unexpected Exception occurred opening MLLP frame - resetting the connection");
+ MllpUtil.resetConnection(socket);
+ throw new MllpException("Unexpected Exception occurred opening MLLP frame", socketEx);
+ }
} catch (IOException unexpectedException) {
LOG.error("Unexpected Exception occurred opening MLLP frame - resetting the connection");
MllpUtil.resetConnection(socket);
@@ -152,6 +162,8 @@ public final class MllpUtil {
throw new MllpCorruptFrameException("Exception encountered looking for the beginning of the MLLP frame, and out-of-frame data had been read", outOfFrameData.toByteArray());
}
}
+
+ return false;
}
/**
@@ -319,7 +331,7 @@ public final class MllpUtil {
}
public static void resetConnection(Socket socket) {
- if (null != socket) {
+ if (null != socket && !socket.isClosed()) {
try {
socket.setSoLinger(true, 0);
} catch (Exception ex) {
http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
index 4233d80..bb87146 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerBlueprintTest.java
@@ -26,6 +26,7 @@ import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultComponentResolver;
import org.apache.camel.spi.ComponentResolver;
+import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.blueprint.CamelBlueprintTestSupport;
import org.apache.camel.test.junit.rule.mllp.MllpServerResource;
import org.apache.camel.util.KeyValueHolder;
@@ -36,7 +37,7 @@ import static org.apache.camel.test.mllp.Hl7MessageGenerator.generateMessage;
public class MllpTcpClientProducerBlueprintTest extends CamelBlueprintTestSupport {
@Rule
- public MllpServerResource mllpServer = new MllpServerResource();
+ public MllpServerResource mllpServer = new MllpServerResource(AvailablePortFinder.getNextAvailable());
final String sourceUri = "direct://source";
final String mockAcknowledgedUri = "mock://acknowledged";
http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerBindTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerBindTimeoutTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerBindTimeoutTest.java
new file mode 100644
index 0000000..3ea892f
--- /dev/null
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerBindTimeoutTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import java.net.ServerSocket;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit.rule.mllp.MllpClientResource;
+import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceException;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.apache.camel.test.mllp.Hl7MessageGenerator.generateMessage;
+
+public class MllpTcpServerConsumerBindTimeoutTest extends CamelTestSupport {
+ @Rule
+ public MllpClientResource mllpClient = new MllpClientResource();
+
+ @EndpointInject(uri = "mock://result")
+ MockEndpoint result;
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ DefaultCamelContext context = (DefaultCamelContext) super.createCamelContext();
+
+ context.setUseMDCLogging(true);
+ context.setName(this.getClass().getSimpleName());
+
+ return context;
+ }
+
+ @Override
+ public boolean isUseAdviceWith() {
+ return true;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+
+ mllpClient.setMllpHost("localhost");
+ mllpClient.setMllpPort(AvailablePortFinder.getNextAvailable());
+
+ return new RouteBuilder() {
+ int connectTimeout = 500;
+ int responseTimeout = 5000;
+
+ @Override
+ public void configure() throws Exception {
+ String routeId = "mllp-test-receiver-route";
+
+ onCompletion()
+ .toF("log:%s?level=INFO&showAll=true", routeId)
+ .log(LoggingLevel.INFO, routeId, "Test route complete");
+
+ fromF("mllp://%s:%d?autoAck=true&connectTimeout=%d&receiveTimeout=%d",
+ mllpClient.getMllpHost(), mllpClient.getMllpPort(), connectTimeout, responseTimeout)
+ .routeId(routeId)
+ .log(LoggingLevel.INFO, routeId, "Test route received message")
+ .to(result);
+
+ }
+ };
+ }
+
+ @Test
+ public void testReceiveSingleMessage() throws Exception {
+ result.expectedMessageCount(1);
+
+ Thread tmpThread = new Thread() {
+ public void run() {
+ try {
+ ServerSocket tmpSocket = new ServerSocket(mllpClient.getMllpPort());
+ Thread.sleep(15000);
+ tmpSocket.close();
+ } catch (Exception ex) {
+ throw new RuntimeException( "Exception caught in dummy listener", ex);
+ }
+ }
+
+ };
+
+ tmpThread.start();
+
+ context.start();
+
+ mllpClient.connect();
+
+ mllpClient.sendMessageAndWaitForAcknowledgement(generateMessage(), 10000);
+
+ assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java
index c77c35f..eb6a463 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.mllp;
+import java.net.BindException;
+import java.net.ServerSocket;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
http://git-wip-us.apache.org/repos/asf/camel/blob/8f09e4b3/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java b/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
index c148310..8215d1b 100644
--- a/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
+++ b/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
@@ -24,6 +24,8 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
+import java.util.LinkedList;
+import java.util.List;
import java.util.regex.Pattern;
import org.junit.rules.ExternalResource;
@@ -436,11 +438,11 @@ public class MllpServerResource extends ExternalResource {
}
void resetConnection(Socket socket) {
- if (null != socket) {
+ if (null != socket && !socket.isClosed()) {
try {
socket.setSoLinger(true, 0);
- } catch (Exception ex) {
- log.warn("Exception encountered setting SO_LINGER to 0 on the socket to force a reset", ex);
+ } catch (SocketException socketEx) {
+ log.debug("SocketException encountered setting SO_LINGER to 0 on the socket to force a reset - ignoring", socketEx);
} finally {
closeConnection(socket);
}
@@ -455,6 +457,7 @@ public class MllpServerResource extends ExternalResource {
Logger log = LoggerFactory.getLogger(this.getClass());
ServerSocket serverSocket;
+ List<ClientSocketThread> clientSocketThreads = new LinkedList<>();
String listenHost = "0.0.0.0";
int listenPort;
@@ -519,30 +522,47 @@ public class MllpServerResource extends ExternalResource {
public void run() {
log.info("Accepting connections on port {}", serverSocket.getLocalPort());
this.setName("MllpServerResource$ServerSocketThread - " + serverSocket.getLocalSocketAddress().toString());
- while (isActive() && serverSocket.isBound()) {
+ while (!isInterrupted() && serverSocket.isBound() && !serverSocket.isClosed()) {
Socket clientSocket = null;
try {
clientSocket = serverSocket.accept();
- clientSocket.setKeepAlive(true);
- clientSocket.setTcpNoDelay(false);
- clientSocket.setSoLinger(false, -1);
- clientSocket.setSoTimeout(5000);
- ClientSocketThread clientSocketThread = new ClientSocketThread(clientSocket);
- clientSocketThread.setDaemon(true);
- clientSocketThread.start();
} catch (SocketTimeoutException timeoutEx) {
if (raiseExceptionOnAcceptTimeout) {
throw new MllpJUnitResourceTimeoutException("Timeout Accepting client connection", timeoutEx);
}
- continue;
- } catch (IOException e) {
- log.warn("IOException creating Client Socket");
+ log.warn("Timeout waiting for client connection");
+ } catch (SocketException socketEx) {
+ log.debug("SocketException encountered accepting client connection - ignoring", socketEx);
+ if (null == clientSocket) {
+ continue;
+ } else if (!clientSocket.isClosed()) {
+ resetConnection(clientSocket);
+ continue;
+ } else {
+ throw new MllpJUnitResourceException("Unexpected SocketException encountered accepting client connection", socketEx);
+ }
+ } catch (Exception ex) {
+ throw new MllpJUnitResourceException("Unexpected exception encountered accepting client connection", ex);
+ }
+ if (null != clientSocket) {
try {
- clientSocket.close();
- } catch (IOException e1) {
- log.warn("Exceptiong encountered closing client socket after attempting to accept connection");
+ clientSocket.setKeepAlive(true);
+ clientSocket.setTcpNoDelay(false);
+ clientSocket.setSoLinger(false, -1);
+ clientSocket.setSoTimeout(5000);
+ ClientSocketThread clientSocketThread = new ClientSocketThread(clientSocket);
+ clientSocketThread.setDaemon(true);
+ clientSocketThread.start();
+ clientSocketThreads.add(clientSocketThread);
+ } catch (Exception unexpectedEx) {
+ log.warn("Unexpected exception encountered configuring client socket");
+ try {
+ clientSocket.close();
+ } catch (IOException ingoreEx) {
+ log.warn("Exceptiong encountered closing client socket after attempting to accept connection", ingoreEx);
+ }
+ throw new MllpJUnitResourceException("Unexpected exception encountered configuring client socket", unexpectedEx);
}
- throw new MllpJUnitResourceException("IOException creating Socket", e);
}
}
log.info("No longer accepting connections - closing TCP Listener on port {}", serverSocket.getLocalPort());
@@ -599,6 +619,22 @@ public class MllpServerResource extends ExternalResource {
public void setRaiseExceptionOnAcceptTimeout(boolean raiseExceptionOnAcceptTimeout) {
this.raiseExceptionOnAcceptTimeout = raiseExceptionOnAcceptTimeout;
}
+
+ @Override
+ public void interrupt() {
+ for (ClientSocketThread clientSocketThread: clientSocketThreads) {
+ clientSocketThread.interrupt();
+ }
+
+ if (serverSocket != null && serverSocket.isBound() && !serverSocket.isClosed()) {
+ try {
+ serverSocket.close();
+ } catch (Exception ex) {
+ log.warn("Exception encountered closing server socket on interrupt", ex);
+ }
+ }
+ super.interrupt();
+ }
}
/**
@@ -638,8 +674,20 @@ public class MllpServerResource extends ExternalResource {
log.info("Handling Connection: {} -> {}", localAddress, remoteAddress);
try {
- while (null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) {
- InputStream instream = clientSocket.getInputStream();
+ while (!isInterrupted() && null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) {
+ InputStream instream;
+ try {
+ instream = clientSocket.getInputStream();
+ } catch (IOException ioEx) {
+ if (clientSocket.isClosed()) {
+ log.debug( "Client socket was closed - ignoring exception", clientSocket);
+ break;
+ } else {
+ throw new MllpJUnitResourceException( "Unexpected IOException encounted getting input stream", ioEx);
+ }
+ } catch (Exception unexpectedEx) {
+ throw new MllpJUnitResourceException( "Unexpected exception encounted getting input stream", unexpectedEx);
+ }
String parsedHL7Message = getMessage(instream);
if (null != parsedHL7Message && parsedHL7Message.length() > 0) {
@@ -721,7 +769,7 @@ public class MllpServerResource extends ExternalResource {
}
}
- log.info("Connection Finished: {} -> {}", localAddress, remoteAddress);
+ log.debug("Client Connection Finished: {} -> {}", localAddress, remoteAddress);
}
/**
@@ -749,8 +797,15 @@ public class MllpServerResource extends ExternalResource {
}
}
} catch (SocketException socketEx) {
- log.error("Unable to read from socket stream when expected bMLLP_ENVELOPE_START_OF_BLOCK - resetting connection ", socketEx);
- resetConnection(clientSocket);
+ if (clientSocket.isClosed()) {
+ log.info("Client socket closed while waiting for MLLP_ENVELOPE_START_OF_BLOCK");
+ } else if ( clientSocket.isConnected() ) {
+ log.info( "SocketException encountered while waiting for MLLP_ENVELOPE_START_OF_BLOCK");
+ resetConnection(clientSocket);
+ } else {
+ log.error("Unable to read from socket stream when expected bMLLP_ENVELOPE_START_OF_BLOCK - resetting connection ", socketEx);
+ resetConnection(clientSocket);
+ }
return null;
}
@@ -872,6 +927,18 @@ public class MllpServerResource extends ExternalResource {
return null;
}
+
+ @Override
+ public void interrupt() {
+ if (null != clientSocket && clientSocket.isConnected() && !clientSocket.isClosed()) {
+ try {
+ clientSocket.close();
+ } catch (Exception ex) {
+ log.warn("Exception encountered closing client socket on interrput", ex);
+ }
+ }
+ super.interrupt();
+ }
}