You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/12/16 21:59:15 UTC
[6/6] camel git commit: CAMEL-10511: Updated MllpTcpClientProducer
and MllpTcpServerConsumer to consume all available data on socket - backport
to 2.17
CAMEL-10511: Updated MllpTcpClientProducer and MllpTcpServerConsumer to consume all available data on socket - backport to 2.17
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a53540da
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a53540da
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a53540da
Branch: refs/heads/camel-2.17.x
Commit: a53540da1e5b7550355c68dc9e98c5e6e77c109a
Parents: 55c621f
Author: Quinn Stevenson <qu...@pronoia-solutions.com>
Authored: Fri Dec 16 13:41:12 2016 -0700
Committer: Quinn Stevenson <qu...@pronoia-solutions.com>
Committed: Fri Dec 16 13:41:12 2016 -0700
----------------------------------------------------------------------
components/camel-mllp/pom.xml | 16 +
.../MllpAcknowledgementDeliveryException.java | 40 ++
.../mllp/MllpAcknowledgementException.java | 17 +-
.../MllpAcknowledgementTimeoutException.java | 69 +++
.../MllpAcknowledgementTimoutException.java | 38 --
...pplicationErrorAcknowledgementException.java | 19 +-
...plicationRejectAcknowledgementException.java | 19 +-
...MllpCommitErrorAcknowledgementException.java | 19 +-
...llpCommitRejectAcknowledgementException.java | 19 +-
.../camel/component/mllp/MllpComponent.java | 40 +-
.../camel/component/mllp/MllpConstants.java | 3 +
.../mllp/MllpCorruptFrameException.java | 40 --
.../camel/component/mllp/MllpEndpoint.java | 122 ++++-
.../camel/component/mllp/MllpException.java | 74 ++-
.../component/mllp/MllpFrameException.java | 41 ++
.../MllpInvalidAcknowledgementException.java | 16 +-
.../mllp/MllpInvalidMessageException.java | 30 ++
.../MllpNegativeAcknowledgementException.java | 32 ++
.../MllpReceiveAcknowledgementException.java | 57 ++
.../component/mllp/MllpReceiveException.java | 46 ++
.../component/mllp/MllpTcpClientProducer.java | 209 ++++----
.../component/mllp/MllpTcpServerConsumer.java | 398 ++++++++------
.../component/mllp/MllpTimeoutException.java | 40 +-
.../component/mllp/MllpWriteException.java | 17 +-
.../camel/component/mllp/impl/Hl7Util.java | 90 ++++
.../mllp/impl/MllpBufferedSocketWriter.java | 123 +++++
.../component/mllp/impl/MllpSocketReader.java | 290 ++++++++++
.../component/mllp/impl/MllpSocketUtil.java | 230 ++++++++
.../component/mllp/impl/MllpSocketWriter.java | 143 +++++
.../camel/component/mllp/impl/MllpUtil.java | 381 -------------
.../mllp/Hl7AcknowledgementGenerator.java | 9 +-
.../camel/component/mllp/MllpExceptionTest.java | 114 ++++
.../mllp/MllpProducerConsumerLoopbackTest.java | 24 +-
.../MllpTcpClientConsumerBlueprintTest.java | 36 --
...llpTcpClientProducerAcknowledgementTest.java | 207 ++++++-
...ntProducerAcknowledgementValidationTest.java | 283 ++++++++++
.../MllpTcpClientProducerBlueprintTest.java | 14 +-
...llpTcpClientProducerConnectionErrorTest.java | 165 ++++++
.../mllp/MllpTcpClientProducerTest.java | 86 ++-
...llpTcpServerConsumerAcknowledgementTest.java | 98 +++-
.../MllpTcpServerConsumerConnectionTest.java | 136 ++++-
...MllpTcpServerConsumerMessageHeadersTest.java | 158 ++++++
...pTcpServerConsumerMessageValidationTest.java | 317 +++++++++++
.../mllp/MllpTcpServerConsumerTest.java | 130 ++++-
.../MllpTcpServerConsumerTransactionTest.java | 142 +++++
.../MllpTcpServerProducerBlueprintTest.java | 35 --
.../camel/component/mllp/impl/Hl7UtilTest.java | 126 +++++
...BufferedSocketAcknowledgementWriterTest.java | 125 +++++
.../MllpBufferedSocketMessageWriterTest.java | 126 +++++
.../MllpSocketAcknowledgementReaderTest.java | 533 +++++++++++++++++++
.../MllpSocketAcknowledgementWriterTest.java | 150 ++++++
.../mllp/impl/MllpSocketMessageReaderTest.java | 527 ++++++++++++++++++
.../mllp/impl/MllpSocketMessageWriterTest.java | 150 ++++++
.../mllp/impl/MllpSocketReaderTestSupport.java | 342 ++++++++++++
.../MllpSocketUtilExceptionHandlingTest.java | 133 +++++
.../impl/MllpSocketUtilFindXxxOfBlockTest.java | 241 +++++++++
.../mllp/impl/MllpSocketUtilSocketTest.java | 288 ++++++++++
.../mllp/impl/MllpSocketWriterTestSupport.java | 132 +++++
.../junit/rule/mllp/MllpClientResource.java | 51 +-
.../junit/rule/mllp/MllpServerResource.java | 511 ++++++++++++------
.../apache/camel/test/util/PayloadBuilder.java | 211 ++++++++
.../blueprint/mllp-tcp-client-producer-test.xml | 2 +-
62 files changed, 7080 insertions(+), 1200 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-mllp/pom.xml b/components/camel-mllp/pom.xml
index c9ca671..a56c363 100644
--- a/components/camel-mllp/pom.xml
+++ b/components/camel-mllp/pom.xml
@@ -56,6 +56,22 @@
<artifactId>camel-test-blueprint</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-sjms</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq.tooling</groupId>
+ <artifactId>activemq-junit</artifactId>
+ <version>${activemq-version}</version>
+ <scope>test</scope>
+ </dependency>
<!-- test logging -->
<dependency>
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java
new file mode 100644
index 0000000..c6fc878
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Consumer cannot deliver the MLLP Acknowledgement
+ */
+public class MllpAcknowledgementDeliveryException extends MllpAcknowledgementException {
+ static final String EXCEPTION_MESSAGE = "HL7 Acknowledgment Delivery Failed";
+
+ public MllpAcknowledgementDeliveryException(byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement);
+ }
+
+ public MllpAcknowledgementDeliveryException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
+ }
+
+ public MllpAcknowledgementDeliveryException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message, hl7Message, hl7Acknowledgement);
+ }
+
+ public MllpAcknowledgementDeliveryException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, hl7Acknowledgement, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
index 624745c..296430b 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
@@ -20,20 +20,25 @@ package org.apache.camel.component.mllp;
* Base class for HL7 Application Acknowledgement Exceptions
*/
public abstract class MllpAcknowledgementException extends MllpException {
+
public MllpAcknowledgementException(String message) {
super(message);
}
- public MllpAcknowledgementException(String message, byte[] mllpPayload) {
- super(message, mllpPayload);
+ public MllpAcknowledgementException(String message, byte[] hl7Message) {
+ super(message, hl7Message);
+ }
+
+ public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message, hl7Message, hl7Acknowledgement);
}
- public MllpAcknowledgementException(String message, Throwable cause) {
- super(message, cause);
+ public MllpAcknowledgementException(String message, byte[] hl7Message, Throwable cause) {
+ super(message, hl7Message, cause);
}
- public MllpAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) {
- super(message, mllpPayload, cause);
+ public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, hl7Acknowledgement, cause);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java
new file mode 100644
index 0000000..8cb2822
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Producer does not receive a HL7 acknowledgement within the configured timespan
+ */
+public class MllpAcknowledgementTimeoutException extends MllpAcknowledgementException {
+ static final String EXCEPTION_MESSAGE = "Timeout receiving HL7 Acknowledgement";
+
+ public MllpAcknowledgementTimeoutException(byte[] hl7Message) {
+ super(EXCEPTION_MESSAGE, hl7Message);
+ }
+
+ public MllpAcknowledgementTimeoutException(byte[] hl7Message, byte[] partialHl7Acknowledgement) {
+ super(EXCEPTION_MESSAGE, hl7Message, partialHl7Acknowledgement);
+ }
+
+ public MllpAcknowledgementTimeoutException(byte[] hl7Message, Throwable cause) {
+ super(EXCEPTION_MESSAGE, hl7Message, cause);
+ }
+
+ public MllpAcknowledgementTimeoutException(byte[] hl7Message, byte[] partialHl7Acknowledgement, Throwable cause) {
+ super(EXCEPTION_MESSAGE, hl7Message, partialHl7Acknowledgement, cause);
+ }
+
+ public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message) {
+ super(message, hl7Message);
+ }
+
+ public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, byte[] partialHl7Acknowledgement) {
+ super(message, hl7Message, partialHl7Acknowledgement);
+ }
+
+ public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, Throwable cause) {
+ super(message, hl7Message, cause);
+ }
+
+ public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, byte[] partialHl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, partialHl7Acknowledgement, cause);
+ }
+
+ /**
+ * Get the HL7 acknowledgement payload associated with this exception, if any.
+ *
+ * @return If the timeout occurred while attempting to receive an HL7 Message, this will be null. If the timeout
+ * occurred while attempting to receive an HL7 Acknowledgement, this will be the HL7 Message. If the timeout occurred
+ * while attempting to complete the read of an HL7 message (i.e. part of the message has already been read), this
+ * will be the partial acknowledgement payload that was read before the timeout.
+ */
+ public byte[] getHl7Acknowledgement() {
+ return super.getHl7Acknowledgement();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java
deleted file mode 100644
index 9288a14..0000000
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mllp;
-
-/**
- * Raised when a MLLP Producer does not receive a HL7 acknowledgement within the configured timespan
- */
-public class MllpAcknowledgementTimoutException extends MllpTimeoutException {
- public MllpAcknowledgementTimoutException(String message) {
- super(message);
- }
-
- public MllpAcknowledgementTimoutException(String message, byte[] mllpPayload) {
- super(message, mllpPayload);
- }
-
- public MllpAcknowledgementTimoutException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public MllpAcknowledgementTimoutException(String message, byte[] mllpPayload, Throwable cause) {
- super(message, mllpPayload, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
index 7d7ecd2..c8db47a 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
@@ -19,20 +19,19 @@ package org.apache.camel.component.mllp;
/**
* Raised when a MLLP Producer receives a HL7 Application Error Acknowledgement
*/
-public class MllpApplicationErrorAcknowledgementException extends MllpAcknowledgementException {
- public MllpApplicationErrorAcknowledgementException(String message) {
- super(message);
- }
+public class MllpApplicationErrorAcknowledgementException extends MllpNegativeAcknowledgementException {
+ static final String EXCEPTION_MESSAGE = "HL7 Application Error Acknowledgment Received";
- public MllpApplicationErrorAcknowledgementException(String message, byte[] mllpPayload) {
- super(message, mllpPayload);
+ public MllpApplicationErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement);
}
- public MllpApplicationErrorAcknowledgementException(String message, Throwable cause) {
- super(message, cause);
+ public MllpApplicationErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
}
- public MllpApplicationErrorAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) {
- super(message, mllpPayload, cause);
+ @Override
+ public String getAcknowledgmentType() {
+ return "AE";
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
index ef2a39d..00dd3e4 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
@@ -19,20 +19,19 @@ package org.apache.camel.component.mllp;
/**
* Raised when a MLLP Producer receives a HL7 Application Reject Acknowledgement
*/
-public class MllpApplicationRejectAcknowledgementException extends MllpAcknowledgementException {
- public MllpApplicationRejectAcknowledgementException(String message) {
- super(message);
- }
+public class MllpApplicationRejectAcknowledgementException extends MllpNegativeAcknowledgementException {
+ static final String EXCEPTION_MESSAGE = "HL7 Application Reject Acknowledgment Received";
- public MllpApplicationRejectAcknowledgementException(String message, byte[] mllpPayload) {
- super(message, mllpPayload);
+ public MllpApplicationRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement);
}
- public MllpApplicationRejectAcknowledgementException(String message, Throwable cause) {
- super(message, cause);
+ public MllpApplicationRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
}
- public MllpApplicationRejectAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) {
- super(message, mllpPayload, cause);
+ @Override
+ public String getAcknowledgmentType() {
+ return "AR";
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
index 1269c56..bec34d9 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
@@ -19,20 +19,19 @@ package org.apache.camel.component.mllp;
/**
* Raised when a MLLP Producer receives a HL7 Commit Error Acknowledgement
*/
-public class MllpCommitErrorAcknowledgementException extends MllpAcknowledgementException {
- public MllpCommitErrorAcknowledgementException(String message) {
- super(message);
- }
+public class MllpCommitErrorAcknowledgementException extends MllpNegativeAcknowledgementException {
+ static final String EXCEPTION_MESSAGE = "HL7 Commit Error Acknowledgment Received";
- public MllpCommitErrorAcknowledgementException(String message, byte[] mllpPayload) {
- super(message, mllpPayload);
+ public MllpCommitErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement);
}
- public MllpCommitErrorAcknowledgementException(String message, Throwable cause) {
- super(message, cause);
+ public MllpCommitErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
}
- public MllpCommitErrorAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) {
- super(message, mllpPayload, cause);
+ @Override
+ public String getAcknowledgmentType() {
+ return "CE";
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
index 99043a9..f98ee5e 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
@@ -19,20 +19,19 @@ package org.apache.camel.component.mllp;
/**
* Raised when a MLLP Producer receives a HL7 Commit Reject Acknowledgement
*/
-public class MllpCommitRejectAcknowledgementException extends MllpAcknowledgementException {
- public MllpCommitRejectAcknowledgementException(String message) {
- super(message);
- }
+public class MllpCommitRejectAcknowledgementException extends MllpNegativeAcknowledgementException {
+ static final String EXCEPTION_MESSAGE = "HL7 Commit Reject Acknowledgment Received";
- public MllpCommitRejectAcknowledgementException(String message, byte[] mllpPayload) {
- super(message, mllpPayload);
+ public MllpCommitRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement);
}
- public MllpCommitRejectAcknowledgementException(String message, Throwable cause) {
- super(message, cause);
+ public MllpCommitRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
}
- public MllpCommitRejectAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) {
- super(message, mllpPayload, cause);
+ @Override
+ public String getAcknowledgmentType() {
+ return "CR";
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
index 3a67dd3..f3e22b1 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
@@ -22,11 +22,15 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.UriEndpointComponent;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+
/**
* Represents the component that manages {@link MllpEndpoint}.
*/
public class MllpComponent extends UriEndpointComponent {
- public static final String MLLP_LOG_PHI_PROPERTY = "org.apache.camel.mllp.logPHI";
+ public static final String MLLP_LOG_PHI_PROPERTY = "org.apache.camel.component.mllp.logPHI";
public MllpComponent() {
super(MllpEndpoint.class);
@@ -64,4 +68,38 @@ public class MllpComponent extends UriEndpointComponent {
return endpoint;
}
+ public static boolean isLogPhi() {
+ String logPhiProperty = System.getProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
+ return Boolean.valueOf(logPhiProperty);
+ }
+
+ public static String covertToPrintFriendlyString(String hl7Message) {
+ if (hl7Message == null) {
+ return "null";
+ } else if (hl7Message.isEmpty()) {
+ return "empty";
+ }
+
+ return hl7Message.replaceAll("" + START_OF_BLOCK, "<VT>").replaceAll("" + END_OF_BLOCK, "<FS>").replaceAll("\r", "<CR>").replaceAll("\n", "<LF>");
+ }
+
+ public static String covertBytesToPrintFriendlyString(byte[] hl7Bytes) {
+ if (hl7Bytes == null) {
+ return "null";
+ } else if (hl7Bytes.length == 0) {
+ return "";
+ }
+
+ return covertBytesToPrintFriendlyString(hl7Bytes, 0, hl7Bytes.length);
+ }
+
+ public static String covertBytesToPrintFriendlyString(byte[] hl7Bytes, int startPosition, int length) {
+ if (null == hl7Bytes) {
+ return "null";
+ } else if (hl7Bytes.length == 0) {
+ return "";
+ }
+ return covertToPrintFriendlyString(new String(hl7Bytes, startPosition, length));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
index da5eb77..4e14da0 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
@@ -21,8 +21,11 @@ public final class MllpConstants {
public static final String MLLP_REMOTE_ADDRESS = "CamelMllpRemoteAddress";
public static final String MLLP_ACKNOWLEDGEMENT = "CamelMllpAcknowledgement";
+ public static final String MLLP_ACKNOWLEDGEMENT_STRING = "CamelMllpAcknowledgementString";
public static final String MLLP_ACKNOWLEDGEMENT_TYPE = "CamelMllpAcknowledgementType";
+ public static final String MLLP_ACKNOWLEDGEMENT_EXCEPTION = "CamelMllpAcknowledgementException";
+ public static final String MLLP_AUTO_ACKNOWLEDGE = "CamelMllpAutoAcknowledge";
/*
Connection Control Exchange Properties
- For Consumers, "SEND" => ACKNOWLEDGEMENT
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCorruptFrameException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCorruptFrameException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCorruptFrameException.java
deleted file mode 100644
index 0310530..0000000
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCorruptFrameException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mllp;
-
-/**
- * Raised when a MLLP Producer or consumer encounters a corrupt MLLP Frame while attempting
- * to read or write a MLLP payload.
- */
-public class MllpCorruptFrameException extends MllpException {
- public MllpCorruptFrameException(String message) {
- super(message);
- }
-
- public MllpCorruptFrameException(String message, byte[] mllpPayload) {
- super(message, mllpPayload);
- }
-
- public MllpCorruptFrameException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public MllpCorruptFrameException(String message, byte[] mllpPayload, Throwable cause) {
- super(message, mllpPayload, cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index 81a2e97..3732254 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -30,14 +30,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Represents a MLLP endpoint.
+ * The MLLP component is designed to handle the MLLP protocol and provide the functionality required by Healthcare providers to communicate with other systems using the MLLP protocol.
+ *
* <p/>
* NOTE: MLLP payloads are not logged unless the logging level is set to DEBUG or TRACE to avoid introducing PHI
* into the log files. Logging of PHI can be globally disabled by setting the org.apache.camel.mllp.logPHI system
* property to false.
* <p/>
*/
-@UriEndpoint(scheme = "mllp", title = "mllp", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp")
+@UriEndpoint(scheme = "mllp", title = "MLLP", syntax = "mllp:hostname:port", consumerClass = MllpTcpServerConsumer.class, label = "mllp")
public class MllpEndpoint extends DefaultEndpoint {
public static final char START_OF_BLOCK = 0x0b; // VT (vertical tab) - decimal 11, octal 013
public static final char END_OF_BLOCK = 0x1c; // FS (file separator) - decimal 28, octal 034
@@ -54,24 +55,30 @@ public class MllpEndpoint extends DefaultEndpoint {
@UriPath @Metadata(required = "true")
int port = -1;
- @UriParam(defaultValue = "5")
+ @UriParam(label = "advanced", defaultValue = "5")
int backlog = 5;
- @UriParam(defaultValue = "30000")
+ @UriParam(label = "timeout", defaultValue = "30000")
int bindTimeout = 30000;
- @UriParam(defaultValue = "5000")
+ @UriParam(label = "timeout", defaultValue = "5000")
int bindRetryInterval = 5000;
- @UriParam(defaultValue = "60000")
+ @UriParam(label = "timeout", defaultValue = "60000")
int acceptTimeout = 60000;
- @UriParam(defaultValue = "30000")
+ @UriParam(label = "timeout", defaultValue = "30000")
int connectTimeout = 30000;
- @UriParam(defaultValue = "10000")
+ @UriParam(label = "timeout", defaultValue = "10000")
int receiveTimeout = 10000;
+ @UriParam(label = "timeout", defaultValue = "-1")
+ int maxReceiveTimeouts = -1;
+
+ @UriParam(label = "timeout", defaultValue = "500")
+ int readTimeout = 500;
+
@UriParam(defaultValue = "true")
boolean keepAlive = true;
@@ -81,16 +88,25 @@ public class MllpEndpoint extends DefaultEndpoint {
@UriParam
boolean reuseAddress;
- @UriParam
+ @UriParam(label = "advanced")
Integer receiveBufferSize;
- @UriParam
+ @UriParam(label = "advanced")
Integer sendBufferSize;
@UriParam(defaultValue = "true")
boolean autoAck = true;
- @UriParam
+ @UriParam(defaultValue = "true")
+ boolean hl7Headers = true;
+
+ @UriParam(defaultValue = "true")
+ boolean bufferWrites = true;
+
+ @UriParam(defaultValue = "false")
+ boolean validatePayload;
+
+ @UriParam(label = "codec")
String charsetName;
public MllpEndpoint(String uri, MllpComponent component) {
@@ -223,7 +239,7 @@ public class MllpEndpoint extends DefaultEndpoint {
}
/**
- * Timeout value while waiting for a TCP connection
+ * Timeout (in milliseconds) while waiting for a TCP connection
* <p/>
* TCP Server Only
*
@@ -238,7 +254,7 @@ public class MllpEndpoint extends DefaultEndpoint {
}
/**
- * Timeout value for establishing for a TCP connection
+ * Timeout (in milliseconds) for establishing for a TCP connection
* <p/>
* TCP Client only
*
@@ -253,7 +269,7 @@ public class MllpEndpoint extends DefaultEndpoint {
}
/**
- * The SO_TIMEOUT value used when waiting for the start of an MLLP frame
+ * The SO_TIMEOUT value (in milliseconds) used when waiting for the start of an MLLP frame
*
* @param receiveTimeout timeout in milliseconds
*/
@@ -261,6 +277,32 @@ public class MllpEndpoint extends DefaultEndpoint {
this.receiveTimeout = receiveTimeout;
}
+ public int getMaxReceiveTimeouts() {
+ return maxReceiveTimeouts;
+ }
+
+ /**
+ * The maximum number of timeouts (specified by receiveTimeout) allowed before the TCP Connection will be reset.
+ *
+ * @param maxReceiveTimeouts maximum number of receiveTimeouts
+ */
+ public void setMaxReceiveTimeouts(int maxReceiveTimeouts) {
+ this.maxReceiveTimeouts = maxReceiveTimeouts;
+ }
+
+ public int getReadTimeout() {
+ return readTimeout;
+ }
+
+ /**
+ * The SO_TIMEOUT value (in milliseconds) used after the start of an MLLP frame has been received
+ *
+ * @param readTimeout timeout in milliseconds
+ */
+ public void setReadTimeout(int readTimeout) {
+ this.readTimeout = readTimeout;
+ }
+
public boolean isKeepAlive() {
return keepAlive;
}
@@ -305,7 +347,7 @@ public class MllpEndpoint extends DefaultEndpoint {
}
/**
- * Sets the SO_RCVBUF option to the specified value
+ * Sets the SO_RCVBUF option to the specified value (in bytes)
*
* @param receiveBufferSize the SO_RCVBUF option value. If null, the system default is used
*/
@@ -318,7 +360,7 @@ public class MllpEndpoint extends DefaultEndpoint {
}
/**
- * Sets the SO_SNDBUF option to the specified value
+ * Sets the SO_SNDBUF option to the specified value (in bytes)
*
* @param sendBufferSize the SO_SNDBUF option value. If null, the system default is used
*/
@@ -341,4 +383,52 @@ public class MllpEndpoint extends DefaultEndpoint {
this.autoAck = autoAck;
}
+ public boolean isHl7Headers() {
+ return hl7Headers;
+ }
+
+ /**
+ * Enable/Disable the automatic generation of message headers from the HL7 Message
+ *
+ * MLLP Consumers only
+ *
+ * @param hl7Headers enabled if true, otherwise disabled
+ */
+ public void setHl7Headers(boolean hl7Headers) {
+ this.hl7Headers = hl7Headers;
+ }
+
+ public boolean isValidatePayload() {
+ return validatePayload;
+ }
+
+ /**
+ * Enable/Disable the validation of HL7 Payloads
+ *
+ * If enabled, HL7 Payloads received from external systems will be validated (see Hl7Util.generateInvalidPayloadExceptionMessage for details on the validation).
+ * If and invalid payload is detected, a MllpInvalidMessageException (for consumers) or a MllpInvalidAcknowledgementException will be thrown.
+ *
+ * @param validatePayload enabled if true, otherwise disabled
+ */
+ public void setValidatePayload(boolean validatePayload) {
+ this.validatePayload = validatePayload;
+ }
+
+ public boolean isBufferWrites() {
+ return bufferWrites;
+ }
+
+ /**
+ * Enable/Disable the validation of HL7 Payloads
+ *
+ * If enabled, MLLP Payloads are buffered and written to the external system in a single write(byte[]) operation.
+ * If disabled, the MLLP payload will not be buffered, and three write operations will be used. The first operation
+ * will write the MLLP start-of-block character {0x0b (ASCII VT)}, the second operation will write the HL7 payload, and the
+ * third operation will writh the MLLP end-of-block character and the MLLP end-of-data character {[0x1c, 0x0d] (ASCII [FS, CR])}.
+ *
+ * @param bufferWrites enabled if true, otherwise disabled
+ */
+ public void setBufferWrites(boolean bufferWrites) {
+ this.bufferWrites = bufferWrites;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
index a1bb182..0d2c1ce 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
@@ -20,30 +20,88 @@ package org.apache.camel.component.mllp;
* Base class for all MLLP Exceptions, and also used as a generic MLLP exception
*/
public class MllpException extends Exception {
- private final byte[] mllpPayload;
+ private final byte[] hl7Message;
+ private final byte[] hl7Acknowledgement;
public MllpException(String message) {
super(message);
- this.mllpPayload = null;
+ this.hl7Message = null;
+ this.hl7Acknowledgement = null;
}
- public MllpException(String message, byte[] mllpPayload) {
+ public MllpException(String message, byte[] hl7Message) {
super(message);
- this.mllpPayload = mllpPayload;
+ this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+ this.hl7Acknowledgement = null;
+ }
+
+ public MllpException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message);
+ this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+ this.hl7Acknowledgement = (hl7Acknowledgement != null && hl7Acknowledgement.length > 0) ? hl7Acknowledgement : null;
}
public MllpException(String message, Throwable cause) {
super(message, cause);
- this.mllpPayload = null;
+ this.hl7Message = null;
+ this.hl7Acknowledgement = null;
}
- public MllpException(String message, byte[] mllpPayload, Throwable cause) {
+ public MllpException(String message, byte[] hl7Message, Throwable cause) {
super(message, cause);
- this.mllpPayload = mllpPayload;
+ this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+ this.hl7Acknowledgement = null;
+ }
+
+ public MllpException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, cause);
+ this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+ this.hl7Acknowledgement = (hl7Acknowledgement != null && hl7Acknowledgement.length > 0) ? hl7Acknowledgement : null;
+ }
+
+
+ /**
+ * Get the HL7 message payload associated with this exception, if any.
+ *
+ * @return HL7 message payload
+ */
+ public byte[] getHl7Message() {
+ return hl7Message;
+ }
+
+ /**
+ * Get the HL7 acknowledgement payload associated with this exception, if any.
+ *
+ * @return HL7 acknowledgement payload
+ */
+ public byte[] getHl7Acknowledgement() {
+ return hl7Acknowledgement;
+ }
+
+ /**
+ * Override the base version of this method, and include the HL7 Message and Acknowledgement, if any
+ *
+ * @return the detail message of this MLLP Exception
+ */
+ @Override
+ public String getMessage() {
+ if (MllpComponent.isLogPhi()) {
+ return String.format("%s \n\t{hl7Message= %s} \n\t{hl7Acknowledgement= %s}",
+ super.getMessage(), MllpComponent.covertBytesToPrintFriendlyString(hl7Message), MllpComponent.covertBytesToPrintFriendlyString(hl7Acknowledgement));
+ } else {
+ return super.getMessage();
+ }
}
+ /**
+ * Return the MLLP Payload that is most likely the cause of the Exception
+ *
+ * If the HL7 Acknowledgement is present, return it. Otherwise, return the HL7 Message.
+ *
+ * @return the MLLP Payload with the framing error
+ */
public byte[] getMllpPayload() {
- return mllpPayload;
+ return (hl7Acknowledgement != null && hl7Acknowledgement.length > 0) ? hl7Acknowledgement : hl7Message;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java
new file mode 100644
index 0000000..7b7a3c4
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * @deprecated - replaced by more specific exceptions. MllpTimeoutException, MllpInvalidMessageException and MllpInvalidAcknowledgementException
+ * Raised when a MLLP Producer or consumer encounters a corrupt MLLP Frame while attempting
+ * to readEnvelopedPayload or writeEnvelopedMessage a MLLP payload.
+ */
+public class MllpFrameException extends MllpException {
+ public MllpFrameException(String message, byte[] hl7Message) {
+ super(message, hl7Message);
+ }
+
+ public MllpFrameException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message, hl7Message, hl7Acknowledgement);
+ }
+
+ public MllpFrameException(String message, byte[] hl7Message, Throwable cause) {
+ super(message, hl7Message, cause);
+ }
+
+ public MllpFrameException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, hl7Acknowledgement, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidAcknowledgementException.java
index 495cfae..67d5316 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidAcknowledgementException.java
@@ -20,19 +20,11 @@ package org.apache.camel.component.mllp;
* Raised when a MLLP Producer receives a HL7 Acknowledgement for which the HL7 Acknowledgement type cannot be determined.
*/
public class MllpInvalidAcknowledgementException extends MllpAcknowledgementException {
- public MllpInvalidAcknowledgementException(String message) {
- super(message);
+ public MllpInvalidAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message, hl7Message, hl7Acknowledgement);
}
- public MllpInvalidAcknowledgementException(String message, byte[] mllpPayload) {
- super(message, mllpPayload);
- }
-
- public MllpInvalidAcknowledgementException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public MllpInvalidAcknowledgementException(String message, byte[] mllpPayload, Throwable cause) {
- super(message, mllpPayload, cause);
+ public MllpInvalidAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, hl7Acknowledgement, cause);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java
new file mode 100644
index 0000000..9cde29a
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Consumer receives an invalid HL7 Message.
+ */
+public class MllpInvalidMessageException extends MllpException {
+ public MllpInvalidMessageException(String message, byte[] hl7Message) {
+ super(message, hl7Message);
+ }
+
+ public MllpInvalidMessageException(String message, byte[] hl7Message, Throwable cause) {
+ super(message, hl7Message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java
new file mode 100644
index 0000000..6b8a4f8
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Abstract base for all MLLP Negative Acknowledgements
+ */
+public abstract class MllpNegativeAcknowledgementException extends MllpAcknowledgementException {
+ public MllpNegativeAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message, hl7Message, hl7Acknowledgement);
+ }
+
+ public MllpNegativeAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, hl7Acknowledgement, cause);
+ }
+
+ public abstract String getAcknowledgmentType();
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java
new file mode 100644
index 0000000..2268137
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Consumer cannot deliver the MLLP Acknowledgement
+ */
+public class MllpReceiveAcknowledgementException extends MllpAcknowledgementException {
+ static final String EXCEPTION_MESSAGE = "HL7 Acknowledgment Receipt Failed";
+
+ public MllpReceiveAcknowledgementException(byte[] hl7Message) {
+ super(EXCEPTION_MESSAGE, hl7Message);
+ }
+
+
+ public MllpReceiveAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement);
+ }
+
+ public MllpReceiveAcknowledgementException(byte[] hl7Message, Throwable cause) {
+ super(EXCEPTION_MESSAGE, hl7Message, cause);
+ }
+
+
+ public MllpReceiveAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
+ }
+
+ public MllpReceiveAcknowledgementException(String message, byte[] hl7Message) {
+ super(message, hl7Message);
+ }
+
+ public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message, hl7Message, hl7Acknowledgement);
+ }
+
+ public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, Throwable cause) {
+ super(message, hl7Message, cause);
+ }
+ public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, hl7Acknowledgement, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java
new file mode 100644
index 0000000..c755d02
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Consumer cannot deliver the MLLP Acknowledgement
+ */
+public class MllpReceiveException extends MllpException {
+ public MllpReceiveException(String message) {
+ super(message);
+ }
+
+ public MllpReceiveException(String message, byte[] hl7Message) {
+ super(message, hl7Message);
+ }
+
+ public MllpReceiveException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message, hl7Message, hl7Acknowledgement);
+ }
+
+ public MllpReceiveException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MllpReceiveException(String message, byte[] hl7Message, Throwable cause) {
+ super(message, hl7Message, cause);
+ }
+
+ public MllpReceiveException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, hl7Acknowledgement, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a53540da/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
index 2e74f6c..69176bc 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
@@ -19,16 +19,20 @@ package org.apache.camel.component.mllp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.net.SocketAddress;
import java.net.SocketException;
-import java.net.SocketTimeoutException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
-import org.apache.camel.component.mllp.impl.MllpUtil;
+import org.apache.camel.component.mllp.impl.Hl7Util;
+import org.apache.camel.component.mllp.impl.MllpBufferedSocketWriter;
+import org.apache.camel.component.mllp.impl.MllpSocketReader;
+import org.apache.camel.component.mllp.impl.MllpSocketUtil;
+import org.apache.camel.component.mllp.impl.MllpSocketWriter;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.IOHelper;
import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT;
+import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING;
import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE;
import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_AFTER_SEND;
import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_BEFORE_SEND;
@@ -46,6 +50,9 @@ public class MllpTcpClientProducer extends DefaultProducer {
Socket socket;
+ MllpSocketReader mllpSocketReader;
+ MllpSocketWriter mllpSocketWriter;
+
public MllpTcpClientProducer(MllpEndpoint endpoint) throws SocketException {
super(endpoint);
log.trace("MllpTcpClientProducer(endpoint)");
@@ -64,7 +71,7 @@ public class MllpTcpClientProducer extends DefaultProducer {
protected void doStop() throws Exception {
log.trace("doStop()");
- MllpUtil.closeConnection(socket);
+ MllpSocketUtil.close(socket, log, "Stopping component");
super.doStop();
}
@@ -75,15 +82,18 @@ public class MllpTcpClientProducer extends DefaultProducer {
// Check BEFORE_SEND Properties
if (exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class)) {
- MllpUtil.resetConnection(socket);
+ MllpSocketUtil.reset(socket, log, "Exchange property " + MLLP_RESET_CONNECTION_BEFORE_SEND + " = " + exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class));
return;
} else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class)) {
- MllpUtil.closeConnection(socket);
+ MllpSocketUtil.close(socket, log, "Exchange property " + MLLP_CLOSE_CONNECTION_BEFORE_SEND + " = " + exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class));
+ return;
}
- Exception connectionException = checkConnection();
- if (null != connectionException) {
- exchange.setException(connectionException);
+ // Establish a connection if needed
+ try {
+ checkConnection();
+ } catch (IOException ioEx) {
+ exchange.setException(ioEx);
return;
}
@@ -94,95 +104,118 @@ public class MllpTcpClientProducer extends DefaultProducer {
message = exchange.getIn();
}
+ message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString());
+ message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress().toString());
+
+ // Send the message to the external system
byte[] hl7MessageBytes = message.getMandatoryBody(byte[].class);
+ byte[] acknowledgementBytes = null;
- log.debug("Sending message to external system");
try {
- MllpUtil.writeFramedPayload(socket, hl7MessageBytes);
+ log.debug("Sending message to external system");
+ mllpSocketWriter.writeEnvelopedPayload(hl7MessageBytes, null);
+ log.debug("Reading acknowledgement from external system");
+ acknowledgementBytes = mllpSocketReader.readEnvelopedPayload(hl7MessageBytes);
+ } catch (MllpWriteException writeEx) {
+ MllpSocketUtil.reset(socket, log, writeEx.getMessage());
+ exchange.setException(writeEx);
+ return;
+ } catch (MllpReceiveException ackReceiveEx) {
+ MllpSocketUtil.reset(socket, log, ackReceiveEx.getMessage());
+ exchange.setException(ackReceiveEx);
+ return;
} catch (MllpException mllpEx) {
+ Throwable mllpExCause = mllpEx.getCause();
+ if (mllpExCause != null && mllpExCause instanceof IOException) {
+ MllpSocketUtil.reset(socket, log, mllpEx.getMessage());
+ }
exchange.setException(mllpEx);
return;
}
- log.debug("Reading acknowledgement from external system");
- byte[] acknowledgementBytes = null;
- try {
- if (MllpUtil.openFrame(socket)) {
- acknowledgementBytes = MllpUtil.closeFrame(socket);
+ log.debug("Populating message headers with the acknowledgement from the external system");
+ message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
+ message.setHeader(MLLP_ACKNOWLEDGEMENT_STRING, new String(acknowledgementBytes, IOHelper.getCharsetName(exchange, true)));
+
+ if (endpoint.validatePayload) {
+ String exceptionMessage = Hl7Util.generateInvalidPayloadExceptionMessage(acknowledgementBytes);
+ if (exceptionMessage != null) {
+ exchange.setException(new MllpInvalidAcknowledgementException(exceptionMessage, hl7MessageBytes, acknowledgementBytes));
+ return;
}
- } catch (SocketTimeoutException timeoutEx) {
- exchange.setException(new MllpAcknowledgementTimoutException("Acknowledgement timout", timeoutEx));
- return;
+ }
+
+ log.debug("Processing the acknowledgement from the external system");
+ try {
+ String acknowledgementType = processAcknowledgment(hl7MessageBytes, acknowledgementBytes);
+ message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, acknowledgementType);
} catch (MllpException mllpEx) {
exchange.setException(mllpEx);
return;
}
- if (null != acknowledgementBytes) {
- log.debug("Populating the exchange with the acknowledgement from the external system");
- message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
+ // Check AFTER_SEND Properties
+ if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) {
+ MllpSocketUtil.reset(socket, log, "Exchange property " + MLLP_RESET_CONNECTION_AFTER_SEND + " = " + exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class));
+ } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) {
+ MllpSocketUtil.close(socket, log, "Exchange property " + MLLP_CLOSE_CONNECTION_AFTER_SEND + " = " + exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class));
+ }
+ }
- message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString());
- message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress());
+ private String processAcknowledgment(byte[] hl7MessageBytes, byte[] hl7AcknowledgementBytes) throws MllpException {
+ String acknowledgementType = "";
- // Now, extract the acknowledgement type and check for a NACK
- byte fieldDelim = acknowledgementBytes[3];
+ if (hl7AcknowledgementBytes != null && hl7AcknowledgementBytes.length > 3) {
+ // Extract the acknowledgement type and check for a NACK
+ byte fieldDelim = hl7AcknowledgementBytes[3];
// First, find the beginning of the MSA segment - should be the second segment
int msaStartIndex = -1;
- for (int i = 0; i < acknowledgementBytes.length; ++i) {
- if (SEGMENT_DELIMITER == acknowledgementBytes[i]) {
+ for (int i = 0; i < hl7AcknowledgementBytes.length; ++i) {
+ if (SEGMENT_DELIMITER == hl7AcknowledgementBytes[i]) {
final byte bM = 77;
final byte bS = 83;
final byte bC = 67;
final byte bA = 65;
final byte bE = 69;
final byte bR = 82;
- /* We've found the start of a new segment - make sure peeking ahead
- won't run off the end of the array - we need at least 7 more bytes
- */
- if (acknowledgementBytes.length > i + 7) {
+ /* We've found the start of a new segment - make sure peeking ahead
+ won't run off the end of the array - we need at least 7 more bytes
+ */
+ if (hl7AcknowledgementBytes.length > i + 7) {
// We can safely peek ahead
- if (bM == acknowledgementBytes[i + 1] && bS == acknowledgementBytes[i + 2] && bA == acknowledgementBytes[i + 3] && fieldDelim == acknowledgementBytes[i + 4]) {
+ if (bM == hl7AcknowledgementBytes[i + 1] && bS == hl7AcknowledgementBytes[i + 2] && bA == hl7AcknowledgementBytes[i + 3] && fieldDelim == hl7AcknowledgementBytes[i + 4]) {
// Found the beginning of the MSA - the next two bytes should be our acknowledgement code
msaStartIndex = i + 1;
- if (bA != acknowledgementBytes[i + 5] && bC != acknowledgementBytes[i + 5]) {
- exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+ if (bA != hl7AcknowledgementBytes[i + 5] && bC != hl7AcknowledgementBytes[i + 5]) {
+ String errorMessage = "Unsupported acknowledgement type: " + new String(hl7AcknowledgementBytes, i + 5, 2);
+ throw new MllpInvalidAcknowledgementException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes);
} else {
- String acknowledgemenTypeString;
- switch (acknowledgementBytes[i + 6]) {
+ switch (hl7AcknowledgementBytes[i + 6]) {
case bA:
- // We have an AA or CA- make sure that's the end of the field
- if (fieldDelim != acknowledgementBytes[i + 7]) {
- exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
- }
- if (bA == acknowledgementBytes[i + 5]) {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+ // We have an AA or CA
+ if (bA == hl7AcknowledgementBytes[i + 5]) {
+ acknowledgementType = "AA";
} else {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CA");
+ acknowledgementType = "CA";
}
break;
case bE:
// We have an AE or CE
- if (bA == acknowledgementBytes[i + 5]) {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AE");
- exchange.setException(new MllpApplicationErrorAcknowledgementException(new String(acknowledgementBytes)));
+ if (bA == hl7AcknowledgementBytes[i + 5]) {
+ throw new MllpApplicationErrorAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
} else {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CE");
- exchange.setException(new MllpCommitErrorAcknowledgementException(new String(acknowledgementBytes)));
+ throw new MllpCommitErrorAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
}
- break;
case bR:
// We have an AR or CR
- if (bA == acknowledgementBytes[i + 5]) {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AR");
- exchange.setException(new MllpApplicationRejectAcknowledgementException(new String(acknowledgementBytes)));
+ if (bA == hl7AcknowledgementBytes[i + 5]) {
+ throw new MllpApplicationRejectAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
} else {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CR");
- exchange.setException(new MllpCommitRejectAcknowledgementException(new String(acknowledgementBytes)));
+ throw new MllpCommitRejectAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
}
- break;
default:
- exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+ String errorMessage = "Unsupported acknowledgement type: " + new String(hl7AcknowledgementBytes, i + 5, 2);
+ throw new MllpInvalidAcknowledgementException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes);
}
}
@@ -192,18 +225,13 @@ public class MllpTcpClientProducer extends DefaultProducer {
}
}
- if (-1 == msaStartIndex) {
+ if (-1 == msaStartIndex && endpoint.validatePayload) {
// Didn't find an MSA
- exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes)));
+ throw new MllpInvalidAcknowledgementException("MSA Not found in acknowledgement", hl7MessageBytes, hl7AcknowledgementBytes);
}
}
- // Check AFTER_SEND Properties
- if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) {
- MllpUtil.resetConnection(socket);
- return;
- } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) {
- MllpUtil.closeConnection(socket);
- }
+
+ return acknowledgementType;
}
/**
@@ -211,28 +239,24 @@ public class MllpTcpClientProducer extends DefaultProducer {
*
* @return null if the connection is valid, otherwise the Exception encounted checking the connection
*/
- Exception checkConnection() {
+ void checkConnection() throws IOException {
if (null == socket || socket.isClosed() || !socket.isConnected()) {
socket = new Socket();
- try {
- socket.setKeepAlive(endpoint.keepAlive);
- socket.setTcpNoDelay(endpoint.tcpNoDelay);
- if (null != endpoint.receiveBufferSize) {
- socket.setReceiveBufferSize(endpoint.receiveBufferSize);
- }
- if (null != endpoint.sendBufferSize) {
- socket.setSendBufferSize(endpoint.sendBufferSize);
- }
- socket.setReuseAddress(endpoint.reuseAddress);
- socket.setSoLinger(false, -1);
-
- // Read Timeout
- socket.setSoTimeout(endpoint.receiveTimeout);
- } catch (SocketException e) {
- return e;
+ socket.setKeepAlive(endpoint.keepAlive);
+ socket.setTcpNoDelay(endpoint.tcpNoDelay);
+ if (null != endpoint.receiveBufferSize) {
+ socket.setReceiveBufferSize(endpoint.receiveBufferSize);
+ } else {
+ endpoint.receiveBufferSize = socket.getReceiveBufferSize();
}
-
+ if (null != endpoint.sendBufferSize) {
+ socket.setSendBufferSize(endpoint.sendBufferSize);
+ } else {
+ endpoint.sendBufferSize = socket.getSendBufferSize();
+ }
+ socket.setReuseAddress(endpoint.reuseAddress);
+ socket.setSoLinger(false, -1);
InetSocketAddress socketAddress;
if (null == endpoint.getHostname()) {
@@ -240,17 +264,18 @@ public class MllpTcpClientProducer extends DefaultProducer {
} else {
socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort());
}
+
log.debug("Connecting to socket on {}", socketAddress);
- try {
- socket.connect(socketAddress, endpoint.connectTimeout);
- } catch (SocketTimeoutException e) {
- return e;
- } catch (IOException e) {
- return e;
+ socket.connect(socketAddress, endpoint.connectTimeout);
+
+ log.debug("Creating MllpSocketReader and MllpSocketWriter");
+ mllpSocketReader = new MllpSocketReader(socket, endpoint.receiveTimeout, endpoint.readTimeout, true);
+ if (endpoint.bufferWrites) {
+ mllpSocketWriter = new MllpBufferedSocketWriter(socket, false);
+ } else {
+ mllpSocketWriter = new MllpSocketWriter(socket, false);
}
}
-
- return null;
}
}