You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/12/16 08:08:17 UTC
[6/8] camel git commit: CAMEL-10511: Updated MllpTcpClientProducer
and MllpTcpServerConsumer to consume all available data on socket
CAMEL-10511: Updated MllpTcpClientProducer and MllpTcpServerConsumer to consume all available data on socket
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e6d58b67
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e6d58b67
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e6d58b67
Branch: refs/heads/camel-2.18.x
Commit: e6d58b67ccfdf024854234cfd9a1a02cde149772
Parents: 0ed067b
Author: Quinn Stevenson <qu...@pronoia-solutions.com>
Authored: Tue Dec 6 12:23:03 2016 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Dec 16 08:58:13 2016 +0100
----------------------------------------------------------------------
.../MllpAcknowledgementDeliveryException.java | 8 +
.../mllp/MllpAcknowledgementException.java | 42 +-
.../MllpAcknowledgementTimeoutException.java | 69 +++
.../MllpAcknowledgementTimoutException.java | 30 --
...pplicationErrorAcknowledgementException.java | 7 +-
...plicationRejectAcknowledgementException.java | 7 +-
...MllpCommitErrorAcknowledgementException.java | 7 +-
...llpCommitRejectAcknowledgementException.java | 7 +-
.../camel/component/mllp/MllpComponent.java | 38 ++
.../camel/component/mllp/MllpConstants.java | 1 +
.../camel/component/mllp/MllpEndpoint.java | 40 ++
.../camel/component/mllp/MllpException.java | 81 ++-
.../component/mllp/MllpFrameException.java | 39 +-
.../mllp/MllpInvalidMessageException.java | 30 ++
.../MllpNegativeAcknowledgementException.java | 32 ++
.../MllpReceiveAcknowledgementException.java | 57 ++
.../component/mllp/MllpReceiveException.java | 46 ++
.../component/mllp/MllpTcpClientProducer.java | 208 ++++----
.../component/mllp/MllpTcpServerConsumer.java | 318 ++++++++---
.../component/mllp/MllpTimeoutException.java | 50 +-
.../component/mllp/MllpWriteException.java | 37 +-
.../AcknowledgmentSynchronizationAdapter.java | 212 --------
.../camel/component/mllp/impl/Hl7Util.java | 90 ++++
.../mllp/impl/MllpBufferedSocketWriter.java | 123 +++++
.../component/mllp/impl/MllpSocketReader.java | 290 ++++++++++
.../component/mllp/impl/MllpSocketUtil.java | 230 ++++++++
.../component/mllp/impl/MllpSocketWriter.java | 143 +++++
.../camel/component/mllp/impl/MllpUtil.java | 390 --------------
.../mllp/Hl7AcknowledgementGenerator.java | 1 +
.../mllp/MllpAcknowledgementExceptionTest.java | 127 -----
.../camel/component/mllp/MllpExceptionTest.java | 114 ++++
.../component/mllp/MllpFrameExceptionTest.java | 101 ----
.../mllp/MllpProducerConsumerLoopbackTest.java | 16 +-
.../MllpTcpClientConsumerBlueprintTest.java | 36 --
...llpTcpClientProducerAcknowledgementTest.java | 207 ++++++-
...ntProducerAcknowledgementValidationTest.java | 283 ++++++++++
.../MllpTcpClientProducerBlueprintTest.java | 14 +-
...llpTcpClientProducerConnectionErrorTest.java | 165 ++++++
.../mllp/MllpTcpClientProducerTest.java | 82 ++-
...llpTcpServerConsumerAcknowledgementTest.java | 5 +-
.../MllpTcpServerConsumerConnectionTest.java | 35 +-
...pTcpServerConsumerMessageValidationTest.java | 317 +++++++++++
.../mllp/MllpTcpServerConsumerTest.java | 126 ++++-
.../MllpTcpServerProducerBlueprintTest.java | 35 --
.../mllp/MllpTimeoutExceptionTest.java | 101 ----
.../component/mllp/MllpWriteExceptionTest.java | 101 ----
.../camel/component/mllp/impl/Hl7UtilTest.java | 126 +++++
...BufferedSocketAcknowledgementWriterTest.java | 125 +++++
.../MllpBufferedSocketMessageWriterTest.java | 126 +++++
.../MllpSocketAcknowledgementReaderTest.java | 533 +++++++++++++++++++
.../MllpSocketAcknowledgementWriterTest.java | 150 ++++++
.../mllp/impl/MllpSocketMessageReaderTest.java | 527 ++++++++++++++++++
.../mllp/impl/MllpSocketMessageWriterTest.java | 150 ++++++
.../mllp/impl/MllpSocketReaderTestSupport.java | 342 ++++++++++++
.../MllpSocketUtilExceptionHandlingTest.java | 133 +++++
.../impl/MllpSocketUtilFindXxxOfBlockTest.java | 241 +++++++++
.../mllp/impl/MllpSocketUtilSocketTest.java | 288 ++++++++++
.../mllp/impl/MllpSocketWriterTestSupport.java | 132 +++++
.../junit/rule/mllp/MllpServerResource.java | 497 +++++++++++------
.../apache/camel/test/util/PayloadBuilder.java | 211 ++++++++
.../src/test/resources/log4j2.properties | 10 +-
61 files changed, 6422 insertions(+), 1667 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java
index 4ac46c9..c6fc878 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementDeliveryException.java
@@ -29,4 +29,12 @@ public class MllpAcknowledgementDeliveryException extends MllpAcknowledgementExc
public MllpAcknowledgementDeliveryException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
}
+
+ public MllpAcknowledgementDeliveryException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message, hl7Message, hl7Acknowledgement);
+ }
+
+ public MllpAcknowledgementDeliveryException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, hl7Acknowledgement, cause);
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
index 992d506..296430b 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementException.java
@@ -20,49 +20,25 @@ package org.apache.camel.component.mllp;
* Base class for HL7 Application Acknowledgement Exceptions
*/
public abstract class MllpAcknowledgementException extends MllpException {
- private final byte[] hl7Message;
- private final byte[] hl7Acknowledgement;
- public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ public MllpAcknowledgementException(String message) {
super(message);
- this.hl7Message = hl7Message;
- this.hl7Acknowledgement = hl7Acknowledgement;
}
- public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
- super(message, cause);
- this.hl7Message = hl7Message;
- this.hl7Acknowledgement = hl7Acknowledgement;
+ public MllpAcknowledgementException(String message, byte[] hl7Message) {
+ super(message, hl7Message);
}
- public byte[] getHl7Message() {
- return hl7Message;
+ public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message, hl7Message, hl7Acknowledgement);
}
- public byte[] getHl7Acknowledgement() {
- return hl7Acknowledgement;
+ public MllpAcknowledgementException(String message, byte[] hl7Message, Throwable cause) {
+ super(message, hl7Message, cause);
}
- @Override
- public String getMessage() {
- if (isLogPhi()) {
- return String.format("%s:\n\tHL7 Message: %s\n\tHL7 Acknowledgement: %s",
- super.getMessage(), covertBytesToPrintFriendlyString(hl7Message), covertBytesToPrintFriendlyString(hl7Acknowledgement));
- } else {
- return super.getMessage();
- }
+ public MllpAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, hl7Acknowledgement, cause);
}
- @Override
- public String toString() {
- StringBuilder stringBuilder = new StringBuilder(this.getClass().getName());
-
- stringBuilder.append(": {hl7Message=")
- .append(covertBytesToPrintFriendlyString(hl7Message))
- .append(", hl7Acknowledgement=")
- .append(covertBytesToPrintFriendlyString(hl7Acknowledgement))
- .append("}");
-
- return stringBuilder.toString();
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java
new file mode 100644
index 0000000..8cb2822
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimeoutException.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Producer does not receive a HL7 acknowledgement within the configured timespan
+ */
+public class MllpAcknowledgementTimeoutException extends MllpAcknowledgementException {
+ static final String EXCEPTION_MESSAGE = "Timeout receiving HL7 Acknowledgement";
+
+ public MllpAcknowledgementTimeoutException(byte[] hl7Message) {
+ super(EXCEPTION_MESSAGE, hl7Message);
+ }
+
+ public MllpAcknowledgementTimeoutException(byte[] hl7Message, byte[] partialHl7Acknowledgement) {
+ super(EXCEPTION_MESSAGE, hl7Message, partialHl7Acknowledgement);
+ }
+
+ public MllpAcknowledgementTimeoutException(byte[] hl7Message, Throwable cause) {
+ super(EXCEPTION_MESSAGE, hl7Message, cause);
+ }
+
+ public MllpAcknowledgementTimeoutException(byte[] hl7Message, byte[] partialHl7Acknowledgement, Throwable cause) {
+ super(EXCEPTION_MESSAGE, hl7Message, partialHl7Acknowledgement, cause);
+ }
+
+ public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message) {
+ super(message, hl7Message);
+ }
+
+ public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, byte[] partialHl7Acknowledgement) {
+ super(message, hl7Message, partialHl7Acknowledgement);
+ }
+
+ public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, Throwable cause) {
+ super(message, hl7Message, cause);
+ }
+
+ public MllpAcknowledgementTimeoutException(String message, byte[] hl7Message, byte[] partialHl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, partialHl7Acknowledgement, cause);
+ }
+
+ /**
+ * Get the HL7 acknowledgement payload associated with this exception, if any.
+ *
+ * @return If the timeout occurred while attempting to receive an HL7 Message, this will be null. If the timeout
+ * occurred while attempting to receive an HL7 Acknowledgement, this will be the HL7 Message. If the timeout occurred
+ * while attempting to complete the read of an HL7 message (i.e. part of the message has already been read), this
+ * will be the partial acknowledgement payload that was read before the timeout.
+ */
+ public byte[] getHl7Acknowledgement() {
+ return super.getHl7Acknowledgement();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java
deleted file mode 100644
index a1cf147..0000000
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpAcknowledgementTimoutException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mllp;
-
-/**
- * Raised when a MLLP Producer does not receive a HL7 acknowledgement within the configured timespan
- */
-public class MllpAcknowledgementTimoutException extends MllpTimeoutException {
- public MllpAcknowledgementTimoutException(String message, byte[] hl7Message) {
- super(message, hl7Message);
- }
-
- public MllpAcknowledgementTimoutException(String message, byte[] hl7Message, Throwable cause) {
- super(message, hl7Message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
index 5b6fed1..c8db47a 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationErrorAcknowledgementException.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.mllp;
/**
* Raised when a MLLP Producer receives a HL7 Application Error Acknowledgement
*/
-public class MllpApplicationErrorAcknowledgementException extends MllpAcknowledgementException {
+public class MllpApplicationErrorAcknowledgementException extends MllpNegativeAcknowledgementException {
static final String EXCEPTION_MESSAGE = "HL7 Application Error Acknowledgment Received";
public MllpApplicationErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
@@ -29,4 +29,9 @@ public class MllpApplicationErrorAcknowledgementException extends MllpAcknowledg
public MllpApplicationErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
}
+
+ @Override
+ public String getAcknowledgmentType() {
+ return "AE";
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
index 76cbf2c..00dd3e4 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpApplicationRejectAcknowledgementException.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.mllp;
/**
* Raised when a MLLP Producer receives a HL7 Application Reject Acknowledgement
*/
-public class MllpApplicationRejectAcknowledgementException extends MllpAcknowledgementException {
+public class MllpApplicationRejectAcknowledgementException extends MllpNegativeAcknowledgementException {
static final String EXCEPTION_MESSAGE = "HL7 Application Reject Acknowledgment Received";
public MllpApplicationRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
@@ -29,4 +29,9 @@ public class MllpApplicationRejectAcknowledgementException extends MllpAcknowled
public MllpApplicationRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
}
+
+ @Override
+ public String getAcknowledgmentType() {
+ return "AR";
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
index aef1a27..bec34d9 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitErrorAcknowledgementException.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.mllp;
/**
* Raised when a MLLP Producer receives a HL7 Commit Error Acknowledgement
*/
-public class MllpCommitErrorAcknowledgementException extends MllpAcknowledgementException {
+public class MllpCommitErrorAcknowledgementException extends MllpNegativeAcknowledgementException {
static final String EXCEPTION_MESSAGE = "HL7 Commit Error Acknowledgment Received";
public MllpCommitErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
@@ -29,4 +29,9 @@ public class MllpCommitErrorAcknowledgementException extends MllpAcknowledgement
public MllpCommitErrorAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
}
+
+ @Override
+ public String getAcknowledgmentType() {
+ return "CE";
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
index 3907694..f98ee5e 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpCommitRejectAcknowledgementException.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.mllp;
/**
* Raised when a MLLP Producer receives a HL7 Commit Reject Acknowledgement
*/
-public class MllpCommitRejectAcknowledgementException extends MllpAcknowledgementException {
+public class MllpCommitRejectAcknowledgementException extends MllpNegativeAcknowledgementException {
static final String EXCEPTION_MESSAGE = "HL7 Commit Reject Acknowledgment Received";
public MllpCommitRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
@@ -29,4 +29,9 @@ public class MllpCommitRejectAcknowledgementException extends MllpAcknowledgemen
public MllpCommitRejectAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
}
+
+ @Override
+ public String getAcknowledgmentType() {
+ return "CR";
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
index b23a421..f3e22b1 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpComponent.java
@@ -22,6 +22,10 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.UriEndpointComponent;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+
/**
* Represents the component that manages {@link MllpEndpoint}.
*/
@@ -64,4 +68,38 @@ public class MllpComponent extends UriEndpointComponent {
return endpoint;
}
+ public static boolean isLogPhi() {
+ String logPhiProperty = System.getProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
+ return Boolean.valueOf(logPhiProperty);
+ }
+
+ public static String covertToPrintFriendlyString(String hl7Message) {
+ if (hl7Message == null) {
+ return "null";
+ } else if (hl7Message.isEmpty()) {
+ return "empty";
+ }
+
+ return hl7Message.replaceAll("" + START_OF_BLOCK, "<VT>").replaceAll("" + END_OF_BLOCK, "<FS>").replaceAll("\r", "<CR>").replaceAll("\n", "<LF>");
+ }
+
+ public static String covertBytesToPrintFriendlyString(byte[] hl7Bytes) {
+ if (hl7Bytes == null) {
+ return "null";
+ } else if (hl7Bytes.length == 0) {
+ return "";
+ }
+
+ return covertBytesToPrintFriendlyString(hl7Bytes, 0, hl7Bytes.length);
+ }
+
+ public static String covertBytesToPrintFriendlyString(byte[] hl7Bytes, int startPosition, int length) {
+ if (null == hl7Bytes) {
+ return "null";
+ } else if (hl7Bytes.length == 0) {
+ return "";
+ }
+ return covertToPrintFriendlyString(new String(hl7Bytes, startPosition, length));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
index 8275a0e..4e14da0 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConstants.java
@@ -21,6 +21,7 @@ public final class MllpConstants {
public static final String MLLP_REMOTE_ADDRESS = "CamelMllpRemoteAddress";
public static final String MLLP_ACKNOWLEDGEMENT = "CamelMllpAcknowledgement";
+ public static final String MLLP_ACKNOWLEDGEMENT_STRING = "CamelMllpAcknowledgementString";
public static final String MLLP_ACKNOWLEDGEMENT_TYPE = "CamelMllpAcknowledgementType";
public static final String MLLP_ACKNOWLEDGEMENT_EXCEPTION = "CamelMllpAcknowledgementException";
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index ae2e01f..3732254 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -100,6 +100,12 @@ public class MllpEndpoint extends DefaultEndpoint {
@UriParam(defaultValue = "true")
boolean hl7Headers = true;
+ @UriParam(defaultValue = "true")
+ boolean bufferWrites = true;
+
+ @UriParam(defaultValue = "false")
+ boolean validatePayload;
+
@UriParam(label = "codec")
String charsetName;
@@ -391,4 +397,38 @@ public class MllpEndpoint extends DefaultEndpoint {
public void setHl7Headers(boolean hl7Headers) {
this.hl7Headers = hl7Headers;
}
+
+ public boolean isValidatePayload() {
+ return validatePayload;
+ }
+
+ /**
+ * Enable/Disable the validation of HL7 Payloads
+ *
+ * If enabled, HL7 Payloads received from external systems will be validated (see Hl7Util.generateInvalidPayloadExceptionMessage for details on the validation).
+ * If and invalid payload is detected, a MllpInvalidMessageException (for consumers) or a MllpInvalidAcknowledgementException will be thrown.
+ *
+ * @param validatePayload enabled if true, otherwise disabled
+ */
+ public void setValidatePayload(boolean validatePayload) {
+ this.validatePayload = validatePayload;
+ }
+
+ public boolean isBufferWrites() {
+ return bufferWrites;
+ }
+
+ /**
+ * Enable/Disable the validation of HL7 Payloads
+ *
+ * If enabled, MLLP Payloads are buffered and written to the external system in a single write(byte[]) operation.
+ * If disabled, the MLLP payload will not be buffered, and three write operations will be used. The first operation
+ * will write the MLLP start-of-block character {0x0b (ASCII VT)}, the second operation will write the HL7 payload, and the
+ * third operation will writh the MLLP end-of-block character and the MLLP end-of-data character {[0x1c, 0x0d] (ASCII [FS, CR])}.
+ *
+ * @param bufferWrites enabled if true, otherwise disabled
+ */
+ public void setBufferWrites(boolean bufferWrites) {
+ this.bufferWrites = bufferWrites;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
index f0d5e84..0d2c1ce 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpException.java
@@ -20,25 +20,88 @@ package org.apache.camel.component.mllp;
* Base class for all MLLP Exceptions, and also used as a generic MLLP exception
*/
public class MllpException extends Exception {
+ private final byte[] hl7Message;
+ private final byte[] hl7Acknowledgement;
+
public MllpException(String message) {
super(message);
+ this.hl7Message = null;
+ this.hl7Acknowledgement = null;
+ }
+
+ public MllpException(String message, byte[] hl7Message) {
+ super(message);
+ this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+ this.hl7Acknowledgement = null;
+ }
+
+ public MllpException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message);
+ this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+ this.hl7Acknowledgement = (hl7Acknowledgement != null && hl7Acknowledgement.length > 0) ? hl7Acknowledgement : null;
}
public MllpException(String message, Throwable cause) {
super(message, cause);
+ this.hl7Message = null;
+ this.hl7Acknowledgement = null;
}
- public boolean isLogPhi() {
- String logPhiProperty = System.getProperty(MllpComponent.MLLP_LOG_PHI_PROPERTY, "true");
- return Boolean.valueOf(logPhiProperty);
+ public MllpException(String message, byte[] hl7Message, Throwable cause) {
+ super(message, cause);
+ this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+ this.hl7Acknowledgement = null;
}
- protected String covertBytesToPrintFriendlyString(byte[] hl7Bytes) {
- if (null == hl7Bytes) {
- return "null";
- } else if (hl7Bytes.length == 0) {
- return "";
+ public MllpException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, cause);
+ this.hl7Message = (hl7Message != null && hl7Message.length > 0) ? hl7Message : null;
+ this.hl7Acknowledgement = (hl7Acknowledgement != null && hl7Acknowledgement.length > 0) ? hl7Acknowledgement : null;
+ }
+
+
+ /**
+ * Get the HL7 message payload associated with this exception, if any.
+ *
+ * @return HL7 message payload
+ */
+ public byte[] getHl7Message() {
+ return hl7Message;
+ }
+
+ /**
+ * Get the HL7 acknowledgement payload associated with this exception, if any.
+ *
+ * @return HL7 acknowledgement payload
+ */
+ public byte[] getHl7Acknowledgement() {
+ return hl7Acknowledgement;
+ }
+
+ /**
+ * Override the base version of this method, and include the HL7 Message and Acknowledgement, if any
+ *
+ * @return the detail message of this MLLP Exception
+ */
+ @Override
+ public String getMessage() {
+ if (MllpComponent.isLogPhi()) {
+ return String.format("%s \n\t{hl7Message= %s} \n\t{hl7Acknowledgement= %s}",
+ super.getMessage(), MllpComponent.covertBytesToPrintFriendlyString(hl7Message), MllpComponent.covertBytesToPrintFriendlyString(hl7Acknowledgement));
+ } else {
+ return super.getMessage();
}
- return new String(hl7Bytes).replaceAll("\r", "<CR>").replaceAll("\n", "<LF>");
}
+
+ /**
+ * Return the MLLP Payload that is most likely the cause of the Exception
+ *
+ * If the HL7 Acknowledgement is present, return it. Otherwise, return the HL7 Message.
+ *
+ * @return the MLLP Payload with the framing error
+ */
+ public byte[] getMllpPayload() {
+ return (hl7Acknowledgement != null && hl7Acknowledgement.length > 0) ? hl7Acknowledgement : hl7Message;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java
index 43b1281..7b7a3c4 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpFrameException.java
@@ -17,44 +17,25 @@
package org.apache.camel.component.mllp;
/**
+ * @deprecated - replaced by more specific exceptions. MllpTimeoutException, MllpInvalidMessageException and MllpInvalidAcknowledgementException
* Raised when a MLLP Producer or consumer encounters a corrupt MLLP Frame while attempting
- * to read or write a MLLP payload.
+ * to readEnvelopedPayload or writeEnvelopedMessage a MLLP payload.
*/
public class MllpFrameException extends MllpException {
- private final byte[] mllpPayload;
-
- public MllpFrameException(String message, byte[] mllpPayload) {
- super(message);
- this.mllpPayload = mllpPayload;
- }
-
- public MllpFrameException(String message, byte[] mllpPayload, Throwable cause) {
- super(message, cause);
- this.mllpPayload = mllpPayload;
+ public MllpFrameException(String message, byte[] hl7Message) {
+ super(message, hl7Message);
}
- public byte[] getMllpPayload() {
- return mllpPayload;
+ public MllpFrameException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message, hl7Message, hl7Acknowledgement);
}
- @Override
- public String getMessage() {
- if (isLogPhi()) {
- return String.format("%s:\n\tMLLP Payload: %s", super.getMessage(), covertBytesToPrintFriendlyString(mllpPayload));
- } else {
- return super.getMessage();
- }
+ public MllpFrameException(String message, byte[] hl7Message, Throwable cause) {
+ super(message, hl7Message, cause);
}
- @Override
- public String toString() {
- StringBuilder stringBuilder = new StringBuilder(this.getClass().getName());
-
- stringBuilder.append(": {mllpPayload=")
- .append(covertBytesToPrintFriendlyString(mllpPayload))
- .append("}");
-
- return stringBuilder.toString();
+ public MllpFrameException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, hl7Acknowledgement, cause);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java
new file mode 100644
index 0000000..9cde29a
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpInvalidMessageException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Consumer receives an invalid HL7 Message.
+ */
+public class MllpInvalidMessageException extends MllpException {
+ public MllpInvalidMessageException(String message, byte[] hl7Message) {
+ super(message, hl7Message);
+ }
+
+ public MllpInvalidMessageException(String message, byte[] hl7Message, Throwable cause) {
+ super(message, hl7Message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java
new file mode 100644
index 0000000..6b8a4f8
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpNegativeAcknowledgementException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Abstract base for all MLLP Negative Acknowledgements
+ */
+public abstract class MllpNegativeAcknowledgementException extends MllpAcknowledgementException {
+ public MllpNegativeAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message, hl7Message, hl7Acknowledgement);
+ }
+
+ public MllpNegativeAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, hl7Acknowledgement, cause);
+ }
+
+ public abstract String getAcknowledgmentType();
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java
new file mode 100644
index 0000000..2268137
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveAcknowledgementException.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Consumer cannot deliver the MLLP Acknowledgement
+ */
+public class MllpReceiveAcknowledgementException extends MllpAcknowledgementException {
+ static final String EXCEPTION_MESSAGE = "HL7 Acknowledgment Receipt Failed";
+
+ public MllpReceiveAcknowledgementException(byte[] hl7Message) {
+ super(EXCEPTION_MESSAGE, hl7Message);
+ }
+
+
+ public MllpReceiveAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement);
+ }
+
+ public MllpReceiveAcknowledgementException(byte[] hl7Message, Throwable cause) {
+ super(EXCEPTION_MESSAGE, hl7Message, cause);
+ }
+
+
+ public MllpReceiveAcknowledgementException(byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(EXCEPTION_MESSAGE, hl7Message, hl7Acknowledgement, cause);
+ }
+
+ public MllpReceiveAcknowledgementException(String message, byte[] hl7Message) {
+ super(message, hl7Message);
+ }
+
+ public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message, hl7Message, hl7Acknowledgement);
+ }
+
+ public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, Throwable cause) {
+ super(message, hl7Message, cause);
+ }
+ public MllpReceiveAcknowledgementException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, hl7Acknowledgement, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java
new file mode 100644
index 0000000..c755d02
--- /dev/null
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpReceiveException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+/**
+ * Raised when a MLLP Consumer cannot deliver the MLLP Acknowledgement
+ */
+public class MllpReceiveException extends MllpException {
+ public MllpReceiveException(String message) {
+ super(message);
+ }
+
+ public MllpReceiveException(String message, byte[] hl7Message) {
+ super(message, hl7Message);
+ }
+
+ public MllpReceiveException(String message, byte[] hl7Message, byte[] hl7Acknowledgement) {
+ super(message, hl7Message, hl7Acknowledgement);
+ }
+
+ public MllpReceiveException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MllpReceiveException(String message, byte[] hl7Message, Throwable cause) {
+ super(message, hl7Message, cause);
+ }
+
+ public MllpReceiveException(String message, byte[] hl7Message, byte[] hl7Acknowledgement, Throwable cause) {
+ super(message, hl7Message, hl7Acknowledgement, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
index 5793aea..69176bc 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
@@ -20,14 +20,19 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
-import java.net.SocketTimeoutException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
-import org.apache.camel.component.mllp.impl.MllpUtil;
+import org.apache.camel.component.mllp.impl.Hl7Util;
+import org.apache.camel.component.mllp.impl.MllpBufferedSocketWriter;
+import org.apache.camel.component.mllp.impl.MllpSocketReader;
+import org.apache.camel.component.mllp.impl.MllpSocketUtil;
+import org.apache.camel.component.mllp.impl.MllpSocketWriter;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.IOHelper;
import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT;
+import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING;
import static org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE;
import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_AFTER_SEND;
import static org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_BEFORE_SEND;
@@ -45,6 +50,9 @@ public class MllpTcpClientProducer extends DefaultProducer {
Socket socket;
+ MllpSocketReader mllpSocketReader;
+ MllpSocketWriter mllpSocketWriter;
+
public MllpTcpClientProducer(MllpEndpoint endpoint) throws SocketException {
super(endpoint);
log.trace("MllpTcpClientProducer(endpoint)");
@@ -63,7 +71,7 @@ public class MllpTcpClientProducer extends DefaultProducer {
protected void doStop() throws Exception {
log.trace("doStop()");
- MllpUtil.closeConnection(socket);
+ MllpSocketUtil.close(socket, log, "Stopping component");
super.doStop();
}
@@ -74,15 +82,18 @@ public class MllpTcpClientProducer extends DefaultProducer {
// Check BEFORE_SEND Properties
if (exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class)) {
- MllpUtil.resetConnection(socket);
+ MllpSocketUtil.reset(socket, log, "Exchange property " + MLLP_RESET_CONNECTION_BEFORE_SEND + " = " + exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class));
return;
} else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class)) {
- MllpUtil.closeConnection(socket);
+ MllpSocketUtil.close(socket, log, "Exchange property " + MLLP_CLOSE_CONNECTION_BEFORE_SEND + " = " + exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class));
+ return;
}
- Exception connectionException = checkConnection();
- if (null != connectionException) {
- exchange.setException(connectionException);
+ // Establish a connection if needed
+ try {
+ checkConnection();
+ } catch (IOException ioEx) {
+ exchange.setException(ioEx);
return;
}
@@ -93,95 +104,118 @@ public class MllpTcpClientProducer extends DefaultProducer {
message = exchange.getIn();
}
+ message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString());
+ message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress().toString());
+
+ // Send the message to the external system
byte[] hl7MessageBytes = message.getMandatoryBody(byte[].class);
+ byte[] acknowledgementBytes = null;
- log.debug("Sending message to external system");
try {
- MllpUtil.writeFramedPayload(socket, hl7MessageBytes);
+ log.debug("Sending message to external system");
+ mllpSocketWriter.writeEnvelopedPayload(hl7MessageBytes, null);
+ log.debug("Reading acknowledgement from external system");
+ acknowledgementBytes = mllpSocketReader.readEnvelopedPayload(hl7MessageBytes);
+ } catch (MllpWriteException writeEx) {
+ MllpSocketUtil.reset(socket, log, writeEx.getMessage());
+ exchange.setException(writeEx);
+ return;
+ } catch (MllpReceiveException ackReceiveEx) {
+ MllpSocketUtil.reset(socket, log, ackReceiveEx.getMessage());
+ exchange.setException(ackReceiveEx);
+ return;
} catch (MllpException mllpEx) {
+ Throwable mllpExCause = mllpEx.getCause();
+ if (mllpExCause != null && mllpExCause instanceof IOException) {
+ MllpSocketUtil.reset(socket, log, mllpEx.getMessage());
+ }
exchange.setException(mllpEx);
return;
}
- log.debug("Reading acknowledgement from external system");
- byte[] acknowledgementBytes = null;
- try {
- if (MllpUtil.openFrame(socket, endpoint.receiveTimeout, endpoint.readTimeout)) {
- acknowledgementBytes = MllpUtil.closeFrame(socket, endpoint.receiveTimeout, endpoint.readTimeout);
+ log.debug("Populating message headers with the acknowledgement from the external system");
+ message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
+ message.setHeader(MLLP_ACKNOWLEDGEMENT_STRING, new String(acknowledgementBytes, IOHelper.getCharsetName(exchange, true)));
+
+ if (endpoint.validatePayload) {
+ String exceptionMessage = Hl7Util.generateInvalidPayloadExceptionMessage(acknowledgementBytes);
+ if (exceptionMessage != null) {
+ exchange.setException(new MllpInvalidAcknowledgementException(exceptionMessage, hl7MessageBytes, acknowledgementBytes));
+ return;
}
- } catch (SocketTimeoutException timeoutEx) {
- exchange.setException(new MllpAcknowledgementTimoutException("Acknowledgement timout", hl7MessageBytes, timeoutEx));
- return;
+ }
+
+ log.debug("Processing the acknowledgement from the external system");
+ try {
+ String acknowledgementType = processAcknowledgment(hl7MessageBytes, acknowledgementBytes);
+ message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, acknowledgementType);
} catch (MllpException mllpEx) {
exchange.setException(mllpEx);
return;
}
- if (null != acknowledgementBytes) {
- log.debug("Populating the exchange with the acknowledgement from the external system");
- message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
+ // Check AFTER_SEND Properties
+ if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) {
+ MllpSocketUtil.reset(socket, log, "Exchange property " + MLLP_RESET_CONNECTION_AFTER_SEND + " = " + exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class));
+ } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) {
+ MllpSocketUtil.close(socket, log, "Exchange property " + MLLP_CLOSE_CONNECTION_AFTER_SEND + " = " + exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class));
+ }
+ }
- message.setHeader(MLLP_LOCAL_ADDRESS, socket.getLocalAddress().toString());
- message.setHeader(MLLP_REMOTE_ADDRESS, socket.getRemoteSocketAddress());
+ private String processAcknowledgment(byte[] hl7MessageBytes, byte[] hl7AcknowledgementBytes) throws MllpException {
+ String acknowledgementType = "";
- // Now, extract the acknowledgement type and check for a NACK
- byte fieldDelim = acknowledgementBytes[3];
+ if (hl7AcknowledgementBytes != null && hl7AcknowledgementBytes.length > 3) {
+ // Extract the acknowledgement type and check for a NACK
+ byte fieldDelim = hl7AcknowledgementBytes[3];
// First, find the beginning of the MSA segment - should be the second segment
int msaStartIndex = -1;
- for (int i = 0; i < acknowledgementBytes.length; ++i) {
- if (SEGMENT_DELIMITER == acknowledgementBytes[i]) {
+ for (int i = 0; i < hl7AcknowledgementBytes.length; ++i) {
+ if (SEGMENT_DELIMITER == hl7AcknowledgementBytes[i]) {
final byte bM = 77;
final byte bS = 83;
final byte bC = 67;
final byte bA = 65;
final byte bE = 69;
final byte bR = 82;
- /* We've found the start of a new segment - make sure peeking ahead
- won't run off the end of the array - we need at least 7 more bytes
- */
- if (acknowledgementBytes.length > i + 7) {
+ /* We've found the start of a new segment - make sure peeking ahead
+ won't run off the end of the array - we need at least 7 more bytes
+ */
+ if (hl7AcknowledgementBytes.length > i + 7) {
// We can safely peek ahead
- if (bM == acknowledgementBytes[i + 1] && bS == acknowledgementBytes[i + 2] && bA == acknowledgementBytes[i + 3] && fieldDelim == acknowledgementBytes[i + 4]) {
+ if (bM == hl7AcknowledgementBytes[i + 1] && bS == hl7AcknowledgementBytes[i + 2] && bA == hl7AcknowledgementBytes[i + 3] && fieldDelim == hl7AcknowledgementBytes[i + 4]) {
// Found the beginning of the MSA - the next two bytes should be our acknowledgement code
msaStartIndex = i + 1;
- if (bA != acknowledgementBytes[i + 5] && bC != acknowledgementBytes[i + 5]) {
- exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes, i + 5, 2), hl7MessageBytes, acknowledgementBytes));
+ if (bA != hl7AcknowledgementBytes[i + 5] && bC != hl7AcknowledgementBytes[i + 5]) {
+ String errorMessage = "Unsupported acknowledgement type: " + new String(hl7AcknowledgementBytes, i + 5, 2);
+ throw new MllpInvalidAcknowledgementException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes);
} else {
- String acknowledgemenTypeString;
- switch (acknowledgementBytes[i + 6]) {
+ switch (hl7AcknowledgementBytes[i + 6]) {
case bA:
- // We have an AA or CA- make sure that's the end of the field
- if (fieldDelim != acknowledgementBytes[i + 7]) {
- exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes, i + 5, 3), hl7MessageBytes, acknowledgementBytes));
- }
- if (bA == acknowledgementBytes[i + 5]) {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AA");
+ // We have an AA or CA
+ if (bA == hl7AcknowledgementBytes[i + 5]) {
+ acknowledgementType = "AA";
} else {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CA");
+ acknowledgementType = "CA";
}
break;
case bE:
// We have an AE or CE
- if (bA == acknowledgementBytes[i + 5]) {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AE");
- exchange.setException(new MllpApplicationErrorAcknowledgementException(hl7MessageBytes, acknowledgementBytes));
+ if (bA == hl7AcknowledgementBytes[i + 5]) {
+ throw new MllpApplicationErrorAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
} else {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CE");
- exchange.setException(new MllpCommitErrorAcknowledgementException(hl7MessageBytes, acknowledgementBytes));
+ throw new MllpCommitErrorAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
}
- break;
case bR:
// We have an AR or CR
- if (bA == acknowledgementBytes[i + 5]) {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "AR");
- exchange.setException(new MllpApplicationRejectAcknowledgementException(hl7MessageBytes, acknowledgementBytes));
+ if (bA == hl7AcknowledgementBytes[i + 5]) {
+ throw new MllpApplicationRejectAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
} else {
- message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, "CR");
- exchange.setException(new MllpCommitRejectAcknowledgementException(hl7MessageBytes, acknowledgementBytes));
+ throw new MllpCommitRejectAcknowledgementException(hl7MessageBytes, hl7AcknowledgementBytes);
}
- break;
default:
- exchange.setException(new MllpInvalidAcknowledgementException(new String(acknowledgementBytes, i + 5, 2), hl7MessageBytes, acknowledgementBytes));
+ String errorMessage = "Unsupported acknowledgement type: " + new String(hl7AcknowledgementBytes, i + 5, 2);
+ throw new MllpInvalidAcknowledgementException(errorMessage, hl7MessageBytes, hl7AcknowledgementBytes);
}
}
@@ -191,18 +225,13 @@ public class MllpTcpClientProducer extends DefaultProducer {
}
}
- if (-1 == msaStartIndex) {
+ if (-1 == msaStartIndex && endpoint.validatePayload) {
// Didn't find an MSA
- exchange.setException(new MllpInvalidAcknowledgementException("MSA Not found in acknowledgement", hl7MessageBytes, acknowledgementBytes));
+ throw new MllpInvalidAcknowledgementException("MSA Not found in acknowledgement", hl7MessageBytes, hl7AcknowledgementBytes);
}
}
- // Check AFTER_SEND Properties
- if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class)) {
- MllpUtil.resetConnection(socket);
- return;
- } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class)) {
- MllpUtil.closeConnection(socket);
- }
+
+ return acknowledgementType;
}
/**
@@ -210,28 +239,24 @@ public class MllpTcpClientProducer extends DefaultProducer {
*
* @return null if the connection is valid, otherwise the Exception encounted checking the connection
*/
- Exception checkConnection() {
+ void checkConnection() throws IOException {
if (null == socket || socket.isClosed() || !socket.isConnected()) {
socket = new Socket();
- try {
- socket.setKeepAlive(endpoint.keepAlive);
- socket.setTcpNoDelay(endpoint.tcpNoDelay);
- if (null != endpoint.receiveBufferSize) {
- socket.setReceiveBufferSize(endpoint.receiveBufferSize);
- }
- if (null != endpoint.sendBufferSize) {
- socket.setSendBufferSize(endpoint.sendBufferSize);
- }
- socket.setReuseAddress(endpoint.reuseAddress);
- socket.setSoLinger(false, -1);
-
- // Read Timeout
- socket.setSoTimeout(endpoint.receiveTimeout);
- } catch (SocketException e) {
- return e;
+ socket.setKeepAlive(endpoint.keepAlive);
+ socket.setTcpNoDelay(endpoint.tcpNoDelay);
+ if (null != endpoint.receiveBufferSize) {
+ socket.setReceiveBufferSize(endpoint.receiveBufferSize);
+ } else {
+ endpoint.receiveBufferSize = socket.getReceiveBufferSize();
}
-
+ if (null != endpoint.sendBufferSize) {
+ socket.setSendBufferSize(endpoint.sendBufferSize);
+ } else {
+ endpoint.sendBufferSize = socket.getSendBufferSize();
+ }
+ socket.setReuseAddress(endpoint.reuseAddress);
+ socket.setSoLinger(false, -1);
InetSocketAddress socketAddress;
if (null == endpoint.getHostname()) {
@@ -239,17 +264,18 @@ public class MllpTcpClientProducer extends DefaultProducer {
} else {
socketAddress = new InetSocketAddress(endpoint.getHostname(), endpoint.getPort());
}
+
log.debug("Connecting to socket on {}", socketAddress);
- try {
- socket.connect(socketAddress, endpoint.connectTimeout);
- } catch (SocketTimeoutException e) {
- return e;
- } catch (IOException e) {
- return e;
+ socket.connect(socketAddress, endpoint.connectTimeout);
+
+ log.debug("Creating MllpSocketReader and MllpSocketWriter");
+ mllpSocketReader = new MllpSocketReader(socket, endpoint.receiveTimeout, endpoint.readTimeout, true);
+ if (endpoint.bufferWrites) {
+ mllpSocketWriter = new MllpBufferedSocketWriter(socket, false);
+ } else {
+ mllpSocketWriter = new MllpSocketWriter(socket, false);
}
}
-
- return null;
}
}