You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ts...@apache.org on 2019/11/20 04:57:27 UTC

[camel] branch master updated: CAMEL-14178: camel-amqp - Add support for reading AMQP anntations

This is an automated email from the ASF dual-hosted git repository.

tsato pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d5aee3  CAMEL-14178: camel-amqp - Add support for reading AMQP anntations
0d5aee3 is described below

commit 0d5aee3f6ea3e95d2fadd3f6716dc87869333dde
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Wed Nov 20 13:56:22 2019 +0900

    CAMEL-14178: camel-amqp - Add support for reading AMQP anntations
---
 .../camel-amqp/src/main/docs/amqp-component.adoc   |  6 +-
 .../apache/camel/component/amqp/AMQPComponent.java | 54 +++++++++++---
 .../camel/component/amqp/AMQPConfiguration.java    | 56 +++++++++++++++
 .../apache/camel/component/amqp/AMQPConstants.java | 37 ++++++++++
 .../camel/component/amqp/AMQPJmsBinding.java       | 83 ++++++++++++++++++++++
 .../springboot/AMQPComponentConfiguration.java     | 15 ++++
 6 files changed, 240 insertions(+), 11 deletions(-)

diff --git a/components/camel-amqp/src/main/docs/amqp-component.adoc b/components/camel-amqp/src/main/docs/amqp-component.adoc
index c4393c1..4652161 100644
--- a/components/camel-amqp/src/main/docs/amqp-component.adoc
+++ b/components/camel-amqp/src/main/docs/amqp-component.adoc
@@ -40,13 +40,14 @@ xref:jms-component.adoc[JMS] component after the destination name.
 
 
 // component options: START
-The AMQP component supports 82 options, which are listed below.
+The AMQP component supports 83 options, which are listed below.
 
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
+| *includeAmqpAnnotations* (common) | Whether to include AMQP annotations when mapping from AMQP to Camel Message. Setting this to true will map AMQP message annotations to message headers. Due to limitations in Apache Qpid JMS API, currently delivery annotations are ignored. | false | boolean
 | *configuration* (advanced) | To use a shared JMS configuration |  | JmsConfiguration
 | *acceptMessagesWhile Stopping* (consumer) | Specifies whether the consumer accept messages while it is stopping. You may consider enabling this option, if you start and stop JMS routes at runtime, while there are still messages enqueued on the queue. If this option is false, and you stop the JMS route, then messages may be rejected, and the JMS broker would have to attempt redeliveries, which yet again may be rejected, and eventually the message may be moved at a dead letter queue on t [...]
 | *allowReplyManagerQuick Stop* (consumer) | Whether the DefaultMessageListenerContainer used in the reply managers for request-reply messaging allow the DefaultMessageListenerContainer.runningAllowed flag to quick stop in case JmsConfiguration#isAcceptMessagesWhileStopping is enabled, and org.apache.camel.CamelContext is currently being stopped. This quick stop ability is enabled by default in the regular JMS consumers but to enable for reply managers you must enable this flag. | false  [...]
@@ -275,7 +276,7 @@ When using Spring Boot make sure to use the following Maven dependency to have s
 ----
 
 
-The component supports 83 options, which are listed below.
+The component supports 84 options, which are listed below.
 
 
 
@@ -322,6 +323,7 @@ The component supports 83 options, which are listed below.
 | *camel.component.amqp.idle-consumer-limit* | Specify the limit for the number of consumers that are allowed to be idle at any given time. | 1 | Integer
 | *camel.component.amqp.idle-task-execution-limit* | Specifies the limit for idle executions of a receive task, not having received any message within its execution. If this limit is reached, the task will shut down and leave receiving to other executing tasks (in the case of dynamic scheduling; see the maxConcurrentConsumers setting). There is additional doc available from Spring. | 1 | Integer
 | *camel.component.amqp.include-all-j-m-s-x-properties* | Whether to include all JMSXxxx properties when mapping from JMS to Camel Message. Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc. Note: If you are using a custom headerFilterStrategy then this option does not apply. | false | Boolean
+| *camel.component.amqp.include-amqp-annotations* | Whether to include AMQP annotations when mapping from AMQP to Camel Message. Setting this to true will map AMQP message annotations to message headers. Due to limitations in Apache Qpid JMS API, currently delivery annotations are ignored. | false | Boolean
 | *camel.component.amqp.include-sent-j-m-s-message-i-d* | Only applicable when sending to JMS destination using InOnly (eg fire and forget). Enabling this option will enrich the Camel Exchange with the actual JMSMessageID that was used by the JMS client when the message was sent to the JMS destination. | false | Boolean
 | *camel.component.amqp.jms-key-format-strategy* | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides two implementations out of the box: default and passthrough. The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of t [...]
 | *camel.component.amqp.jms-operations* | Allows you to use your own implementation of the org.springframework.jms.core.JmsOperations interface. Camel uses JmsTemplate as default. Can be used for testing purpose, but not used much as stated in the spring API docs. The option is a org.springframework.jms.core.JmsOperations type. |  | String
diff --git a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPComponent.java b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPComponent.java
index 5dde41d..f255ce4 100644
--- a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPComponent.java
+++ b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPComponent.java
@@ -16,13 +16,16 @@
  */
 package org.apache.camel.component.amqp;
 
+import java.util.Map;
 import java.util.Set;
 
 import javax.jms.ConnectionFactory;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
 import org.apache.camel.component.jms.JmsComponent;
 import org.apache.camel.component.jms.JmsConfiguration;
+import org.apache.camel.component.jms.JmsEndpoint;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.qpid.jms.JmsConnectionFactory;
 
@@ -35,6 +38,7 @@ public class AMQPComponent extends JmsComponent {
     // Constructors
 
     public AMQPComponent() {
+        super();
     }
 
     public AMQPComponent(JmsConfiguration configuration) {
@@ -49,6 +53,20 @@ public class AMQPComponent extends JmsComponent {
         setConnectionFactory(connectionFactory);
     }
 
+    // Factory methods
+
+    public static AMQPComponent amqpComponent(String uri) {
+        JmsConnectionFactory connectionFactory = new JmsConnectionFactory(uri);
+        connectionFactory.setTopicPrefix("topic://");
+        return new AMQPComponent(connectionFactory);
+    }
+
+    public static AMQPComponent amqpComponent(String uri, String username, String password) {
+        JmsConnectionFactory connectionFactory = new JmsConnectionFactory(username, password, uri);
+        connectionFactory.setTopicPrefix("topic://");
+        return new AMQPComponent(connectionFactory);
+    }
+
     // Life-cycle
 
     @Override
@@ -65,18 +83,36 @@ public class AMQPComponent extends JmsComponent {
         super.doStart();
     }
 
-    // Factory methods
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        JmsEndpoint endpoint = (JmsEndpoint) super.createEndpoint(uri, remaining, parameters);
+        endpoint.setBinding(new AMQPJmsBinding(endpoint));
+        return endpoint;
+    }
 
-    public static AMQPComponent amqpComponent(String uri) {
-        JmsConnectionFactory connectionFactory = new JmsConnectionFactory(uri);
-        connectionFactory.setTopicPrefix("topic://");
-        return new AMQPComponent(connectionFactory);
+    /**
+     * Factory method to create the default configuration instance
+     *
+     * @return a newly created configuration object which can then be further
+     *         customized
+     */
+    @Override
+    protected JmsConfiguration createConfiguration() {
+        return new AMQPConfiguration();
     }
 
-    public static AMQPComponent amqpComponent(String uri, String username, String password) {
-        JmsConnectionFactory connectionFactory = new JmsConnectionFactory(username, password, uri);
-        connectionFactory.setTopicPrefix("topic://");
-        return new AMQPComponent(connectionFactory);
+    // Properties
+
+    /**
+     * Whether to include AMQP annotations when mapping from AMQP to Camel Message.
+     * Setting this to true will map AMQP message annotations to message headers.
+     * Due to limitations in Apache Qpid JMS API, currently delivery annotations
+     * are ignored.
+     */
+    public void setIncludeAmqpAnnotations(boolean includeAmqpAnnotations) {
+        if (getConfiguration() instanceof AMQPConfiguration) {
+            ((AMQPConfiguration) getConfiguration()).setIncludeAmqpAnnotations(includeAmqpAnnotations);
+        }
     }
 
 }
diff --git a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConfiguration.java b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConfiguration.java
new file mode 100644
index 0000000..ddf26b0
--- /dev/null
+++ b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConfiguration.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.amqp;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.component.jms.JmsConfiguration;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+
+@UriParams
+public class AMQPConfiguration extends JmsConfiguration {
+
+    @UriParam(label = "consumer,advanced",
+        description = "Whether to include AMQP annotations when mapping from AMQP to Camel Message."
+            + " Setting this to true will map AMQP message annotations to message headers."
+            + " Due to limitations in Apache Qpid JMS API, currently delivery annotations are ignored.")
+    private boolean includeAmqpAnnotations;
+
+    public AMQPConfiguration() {
+        super();
+    }
+
+    public AMQPConfiguration(ConnectionFactory connectionFactory) {
+        super(connectionFactory);
+    }
+
+    public boolean isIncludeAmqpAnnotations() {
+        return includeAmqpAnnotations;
+    }
+
+    /**
+     * Whether to include AMQP annotations when mapping from AMQP to Camel Message.
+     * Setting this to true will map AMQP message annotations to message headers.
+     * Due to limitations in Apache Qpid JMS API, currently delivery annotations
+     * are ignored.
+     */
+    public void setIncludeAmqpAnnotations(boolean includeAmqpAnnotations) {
+        this.includeAmqpAnnotations = includeAmqpAnnotations;
+    }
+
+}
diff --git a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConstants.java b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConstants.java
new file mode 100644
index 0000000..885d62b
--- /dev/null
+++ b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConstants.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.amqp;
+
+/**
+ * AMQP constants
+ */
+public final class AMQPConstants {
+
+    /**
+     * AMQP message annotation prefix
+     */
+    public static final String JMS_AMQP_MA_PREFIX = "JMS_AMQP_MA_";
+
+    /**
+     * AMQP message delivery prefix
+     */
+    public static final String JMS_AMQP_DA_PREFIX = "JMS_AMQP_DA_";
+
+    private AMQPConstants() {
+        // utility class
+    }
+}
diff --git a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPJmsBinding.java b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPJmsBinding.java
new file mode 100644
index 0000000..ae53ba0
--- /dev/null
+++ b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPJmsBinding.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.amqp;
+
+import java.util.Map;
+
+import javax.jms.Message;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.jms.JmsBinding;
+import org.apache.camel.component.jms.JmsEndpoint;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Strategy used to convert between a Camel {@link Exchange} and {@link org.apache.camel.component.jms.JmsMessage}
+ * to and from a Qpid JMS {@link JmsMessage}
+ */
+public class AMQPJmsBinding extends JmsBinding {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQPJmsBinding.class);
+
+    private boolean includeAmqpAnnotations;
+
+    public AMQPJmsBinding(JmsEndpoint endpoint) {
+        super(endpoint);
+        if (endpoint.getConfiguration() instanceof AMQPConfiguration) {
+            includeAmqpAnnotations = ((AMQPConfiguration) endpoint.getConfiguration()).isIncludeAmqpAnnotations();
+        }
+    }
+
+    @Override
+    public Map<String, Object> extractHeadersFromJms(Message jmsMessage, Exchange exchange) {
+        Map<String, Object> headers = super.extractHeadersFromJms(jmsMessage, exchange);
+        if (!includeAmqpAnnotations) {
+            return headers;
+        }
+
+        AmqpJmsMessageFacade facade = getMessageFacade(jmsMessage);
+        if (facade == null) {
+            return headers;
+        }
+
+        // message annotations
+        facade.filterTracingAnnotations((key, value) -> {
+            LOG.trace("Extract message annotation: {} = {}", key, value);
+            headers.put(AMQPConstants.JMS_AMQP_MA_PREFIX + key, value);
+        });
+
+        // delivery annotations
+        // currently not possible to read due to the Facade API limitations
+        // https://issues.apache.org/jira/browse/QPIDJMS-153
+
+        return headers;
+    }
+
+    private AmqpJmsMessageFacade getMessageFacade(Message message) {
+        if (message instanceof JmsMessage) {
+            JmsMessage jmsMessage = (JmsMessage) message;
+            if (jmsMessage.getFacade() instanceof AmqpJmsMessageFacade) {
+                return (AmqpJmsMessageFacade) jmsMessage.getFacade();
+            }
+        }
+        return null;
+    }
+
+}
diff --git a/platforms/spring-boot/components-starter/camel-amqp-starter/src/main/java/org/apache/camel/component/amqp/springboot/AMQPComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-amqp-starter/src/main/java/org/apache/camel/component/amqp/springboot/AMQPComponentConfiguration.java
index c2a7e73..7efaab6 100644
--- a/platforms/spring-boot/components-starter/camel-amqp-starter/src/main/java/org/apache/camel/component/amqp/springboot/AMQPComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-amqp-starter/src/main/java/org/apache/camel/component/amqp/springboot/AMQPComponentConfiguration.java
@@ -41,6 +41,13 @@ public class AMQPComponentConfiguration
      */
     private Boolean enabled;
     /**
+     * Whether to include AMQP annotations when mapping from AMQP to Camel
+     * Message. Setting this to true will map AMQP message annotations to
+     * message headers. Due to limitations in Apache Qpid JMS API, currently
+     * delivery annotations are ignored.
+     */
+    private Boolean includeAmqpAnnotations = false;
+    /**
      * To use a shared JMS configuration. The option is a
      * org.apache.camel.component.jms.JmsConfiguration type.
      */
@@ -638,6 +645,14 @@ public class AMQPComponentConfiguration
      */
     private Boolean bridgeErrorHandler = false;
 
+    public Boolean getIncludeAmqpAnnotations() {
+        return includeAmqpAnnotations;
+    }
+
+    public void setIncludeAmqpAnnotations(Boolean includeAmqpAnnotations) {
+        this.includeAmqpAnnotations = includeAmqpAnnotations;
+    }
+
     public String getConfiguration() {
         return configuration;
     }