You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2021/04/27 08:18:21 UTC

[camel] branch main updated: [CAMEL-16417] IBM i Message Queue enhancements (#5456)

This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d026fb  [CAMEL-16417] IBM i Message Queue enhancements (#5456)
0d026fb is described below

commit 0d026fbaa052ff85ed4b99dbae1de273fb1cac36
Author: Jesse Gorzinski <17...@users.noreply.github.com>
AuthorDate: Tue Apr 27 03:17:53 2021 -0500

    [CAMEL-16417] IBM i Message Queue enhancements (#5456)
    
    * IBM i Message Queue enhancements
    
    * avoid cast of header fetch
    
    Co-authored-by: Omar Al-Safi <om...@gmail.com>
    
    * readability improvement per review suggestion
    
    Co-authored-by: Omar Al-Safi <om...@gmail.com>
    
    * add ObjectHelper import
    
    * fixup sourcecheck failure
    
    Co-authored-by: Omar Al-Safi <om...@gmail.com>
---
 components/camel-jt400/pom.xml                     |  4 +--
 .../camel-jt400/src/main/docs/jt400-component.adoc | 36 ++++++++++++++++++----
 .../camel/component/jt400/Jt400Component.java      |  2 +-
 .../camel/component/jt400/Jt400Configuration.java  | 16 ++++++++++
 .../camel/component/jt400/Jt400Constants.java      |  3 ++
 .../camel/component/jt400/Jt400Endpoint.java       |  8 +++++
 .../component/jt400/Jt400MsgQueueConsumer.java     |  7 +++++
 .../component/jt400/Jt400MsgQueueProducer.java     |  8 ++++-
 8 files changed, 74 insertions(+), 10 deletions(-)

diff --git a/components/camel-jt400/pom.xml b/components/camel-jt400/pom.xml
index f66a954..fc9b396 100644
--- a/components/camel-jt400/pom.xml
+++ b/components/camel-jt400/pom.xml
@@ -28,8 +28,8 @@
 
     <artifactId>camel-jt400</artifactId>
     <packaging>jar</packaging>
-    <name>Camel :: Java Toolbox for AS/400</name>
-    <description>Camel AS/400 support</description>
+    <name>Camel :: Java Toolbox for IBM i</name>
+    <description>Camel IBM i support</description>
 
     <properties>
     </properties>
diff --git a/components/camel-jt400/src/main/docs/jt400-component.adoc b/components/camel-jt400/src/main/docs/jt400-component.adoc
index 92d739d..e1ebc96 100644
--- a/components/camel-jt400/src/main/docs/jt400-component.adoc
+++ b/components/camel-jt400/src/main/docs/jt400-component.adoc
@@ -96,7 +96,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (32 parameters):
+=== Query Parameters (33 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -112,6 +112,7 @@ with the following path and query parameters:
 | *readTimeout* (consumer) | Timeout in millis the consumer will wait while trying to read a new message of the data queue. | 30000 | int
 | *searchType* (consumer) | Search type such as EQ for equal etc. There are 6 enums and the value can be one of: EQ, NE, LT, LE, GT, GE | EQ | SearchType
 | *sendEmptyMessageWhenIdle* (consumer) | If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead. | false | boolean
+| *sendingReply* (consumer) | If true, the consumer endpoint will set the Jt400Constants.MESSAGE_REPLYTO_KEY header of the camel message for any IBM i inquiry messages received. If that message is then routed to a producer endpoint, the action will not be processed as sending a message to the queue, but rather a reply to the specific inquiry message. | true | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut |  | ExchangePattern
 | *pollStrategy* (consumer) | A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel. |  | PollingConsumerPollStrategy
@@ -199,10 +200,19 @@ be determined, the headers will not be set
 [width="100%",cols="2m,2m,1m,5",options="header"]
 |===
 | Header constant                          | Header value                      | Type    | Description
-| Jt400Endpoint.MESSAGE_ID                 | "CamelJt400MessageID"             | String  | The message identifier
-| Jt400Endpoint.MESSAGE_FILE               | "CamelJt400MessageFile"           | String  | The message file name
-| Jt400Endpoint.MESSAGE_TYPE               | "CamelJt400MessageType"           | Integer | The message type (corresponds to constants defined in the AS400Message class)
-| Jt400Endpoint.SENDER_INFORMATION         | "SENDER_INFORMATION"              | String  | The job identifier of the sending job
+| Jt400Constants.MESSAGE_ID                | "CamelJt400MessageID"             | String  | The message identifier
+| Jt400Constants.MESSAGE_FILE              | "CamelJt400MessageFile"           | String  | The message file name
+| Jt400Constants.MESSAGE_TYPE              | "CamelJt400MessageType"           | Integer | The message type (corresponds to constants defined in the AS400Message class)
+| Jt400Constants.MESSAGE_DFT_RPY           | "CamelJt400MessageDefaultReply"   | String  | The default message reply, when the message is an inquiry message
+| Jt400Constants.MESSAGE_REPLYTO_KEY       | "CamelJt400MessageReplyToKey"     | byte[]  | The key of the message that will be replied to (if the `sendingReply` parameter is set to `true`)
+| Jt400Constants.SENDER_INFORMATION        | "SENDER_INFORMATION"              | String  | The job identifier of the sending job
+|===
+
+=== Producer headers when sending to message queues
+
+[width="100%",cols="2m,2m,1m,5",options="header"]
+|===
+| Jt400Constants.MESSAGE_REPLYTO_KEY        | "CamelJt400MessageReplyToKey"     | byte[]  | If set, and if the message body is not empty, a new message will not be sent to the provided message queue. Instead, a response will be sent to the message identified by the given key. This is set automatically when reading from the message queue if the `sendingReply` parameter is set to `true`.
 |===
 
 == Example
@@ -270,9 +280,23 @@ from("jms:queue:input")
 
 [source,java]
 -------------------------------------------------------------------------------------------------------
-from("jt400://username:password@system/lib.lib/MSGOUTQ.DTAQ")
+from("jt400://username:password@system/lib.lib/MSGOUTQ.MSGQ")
 .to("jms:queue:output");
 -------------------------------------------------------------------------------------------------------
 
+=== Replying to an inquiry message on a message queue
+
+[source,java]
+-------------------------------------------------------------------------------------------------------
+from("jt400://username:password@localhost/qsys.lib/qusrsys.lib/myq.msgq?sendingReply=true")
+.choice()
+    .when(header(Jt400Constants.MESSAGE_TYPE).isEqualTo(AS400Message.INQUIRY))
+        .process((exchange) -> {
+            String reply = // insert reply logic here
+            exchange.getIn().setBody(reply);
+        })
+        .to("jt400://username:password@localhost/qsys.lib/qusrsys.lib/myq.msgq");
+-------------------------------------------------------------------------------------------------------
+
 
 include::{page-component-version}@camel-spring-boot::page$jt400-starter.adoc[]
diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java
index 04f0b51..a234dee 100644
--- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java
+++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Component.java
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
  * {@link org.apache.camel.Component} to provide integration with IBM i objects (IBM i is the replacement for AS/400 and
  * iSeries servers).
  * 
- * Current implementation supports working with data queues (*DTAQ) and Program calls (*PGM)
+ * Current implementation supports working with data queues (*DTAQ), message queues (*MSGQ), and Program calls (*PGM)
  */
 @Component("jt400")
 public class Jt400Component extends DefaultComponent {
diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
index 880a11e..4ee9e6c 100644
--- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
+++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
@@ -162,6 +162,9 @@ public class Jt400Configuration {
     @UriParam(label = "consumer", defaultValue = "OLD")
     private MessageAction messageAction = MessageAction.OLD;
 
+    @UriParam(label = "consumer", defaultValue = "true")
+    private boolean sendingReply = true;
+
     public Jt400Configuration(String endpointUri, AS400ConnectionPool connectionPool) throws URISyntaxException {
         ObjectHelper.notNull(endpointUri, "endpointUri", this);
         ObjectHelper.notNull(connectionPool, "connectionPool", this);
@@ -381,6 +384,19 @@ public class Jt400Configuration {
         this.messageAction = messageAction;
     }
 
+    public boolean isSendingReply() {
+        return this.sendingReply;
+    }
+
+    /**
+     * If true, the consumer endpoint will set the Jt400Constants.MESSAGE_REPLYTO_KEY header of the camel message for
+     * any IBM i inquiry messages received. If that message is then routed to a producer endpoint, the action will not
+     * be processed as sending a message to the queue, but rather a reply to the specific inquiry message.
+     */
+    public void setSendingReply(boolean sendingReply) {
+        this.sendingReply = sendingReply;
+    }
+
     public void setOutputFieldsIdx(String outputFieldsIdx) {
         if (outputFieldsIdx != null) {
             String[] outputArray = outputFieldsIdx.split(",");
diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Constants.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Constants.java
index f420155..d1ec6d5 100755
--- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Constants.java
+++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Constants.java
@@ -28,4 +28,7 @@ public interface Jt400Constants {
     public static final String MESSAGE_ID = "CamelJt400MessageID";
     public static final String MESSAGE_FILE = "CamelJt400MessageFile";
     public static final String MESSAGE_TYPE = "CamelJt400MessageType";
+    public static final String MESSAGE_DFT_RPY = "CamelJt400MessageDefaultReply";
+    public static final String MESSAGE_REPLYTO_KEY = "CamelJt400MessageReplyToKey";
+
 }
diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
index f4f6a5a..de7d66c 100644
--- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
+++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
@@ -282,6 +282,14 @@ public class Jt400Endpoint extends ScheduledPollEndpoint implements MultipleCons
         return configuration.getMessageAction();
     }
 
+    public void setSendingReply(boolean sendingReply) {
+        configuration.setSendingReply(sendingReply);
+    }
+
+    public boolean isSendingReply() {
+        return configuration.isSendingReply();
+    }
+
     @Override
     public boolean isMultipleConsumersSupported() {
         return true;
diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
index 8dac242..a5fc0d5 100755
--- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
+++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.jt400;
 
+import com.ibm.as400.access.AS400Message;
 import com.ibm.as400.access.MessageQueue;
 import com.ibm.as400.access.QueuedMessage;
 import org.apache.camel.Exchange;
@@ -127,6 +128,12 @@ public class Jt400MsgQueueConsumer extends ScheduledPollConsumer {
         setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_ID, entry.getID());
         setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_FILE, entry.getFileName());
         setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_TYPE, entry.getType());
+        if (AS400Message.INQUIRY == entry.getType()) {
+            setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_DFT_RPY, entry.getDefaultReply());
+            if (getEndpoint().isSendingReply()) {
+                setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_REPLYTO_KEY, entry.getKey());
+            }
+        }
         exchange.getIn().setBody(entry.getText());
         return exchange;
     }
diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueProducer.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueProducer.java
index 3934734..0fd7a1d 100755
--- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueProducer.java
+++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueProducer.java
@@ -20,6 +20,7 @@ import com.ibm.as400.access.MessageQueue;
 import org.apache.camel.Exchange;
 import org.apache.camel.Producer;
 import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  * {@link Producer} to send data to an IBM i message queue.
@@ -50,7 +51,12 @@ public class Jt400MsgQueueProducer extends DefaultProducer {
 
     private void process(MessageQueue queue, Exchange exchange) throws Exception {
         String msgText = exchange.getIn().getBody(String.class);
-        queue.sendInformational(msgText);
+        byte[] messageKey = exchange.getIn().getHeader(Jt400Constants.MESSAGE_REPLYTO_KEY, byte[].class);
+        if (ObjectHelper.isNotEmpty(messageKey) && ObjectHelper.isNotEmpty(msgText)) {
+            queue.reply(messageKey, msgText);
+        } else {
+            queue.sendInformational(msgText);
+        }
     }
 
 }