You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2020/10/02 08:49:33 UTC

[GitHub] [camel] omarsmak commented on a change in pull request #4346: CAMEL-15496: Headers and Properties support

omarsmak commented on a change in pull request #4346:
URL: https://github.com/apache/camel/pull/4346#discussion_r498692546



##########
File path: components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
##########
@@ -16,42 +16,87 @@
  */
 package org.apache.camel.component.rabbitmq;
 
-public final class RabbitMQConstants {
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public enum RabbitMQConstants {
 
     // TODO need to change the constant which is start with camel
-    public static final String ROUTING_KEY = "rabbitmq.ROUTING_KEY";
-    public static final String EXCHANGE_OVERRIDE_NAME = "rabbitmq.EXCHANGE_OVERRIDE_NAME";
-    public static final String EXCHANGE_NAME = "rabbitmq.EXCHANGE_NAME";
-    public static final String CONTENT_TYPE = "rabbitmq.CONTENT_TYPE";
-    public static final String PRIORITY = "rabbitmq.PRIORITY";
-    public static final String DELIVERY_TAG = "rabbitmq.DELIVERY_TAG";
-    public static final String REDELIVERY_TAG = "rabbitmq.REDELIVERY_TAG";
-    public static final String CORRELATIONID = "rabbitmq.CORRELATIONID";
-    public static final String MESSAGE_ID = "rabbitmq.MESSAGE_ID";
-    public static final String DELIVERY_MODE = "rabbitmq.DELIVERY_MODE";
-    public static final String USERID = "rabbitmq.USERID";
-    public static final String CLUSTERID = "rabbitmq.CLUSTERID";
-    public static final String REQUEST_TIMEOUT = "rabbitmq.REQUEST_TIMEOUT";
-    public static final String REPLY_TO = "rabbitmq.REPLY_TO";
-    public static final String CONTENT_ENCODING = "rabbitmq.CONTENT_ENCODING";
-    public static final String TYPE = "rabbitmq.TYPE";
-    public static final String EXPIRATION = "rabbitmq.EXPIRATION";
-    public static final String TIMESTAMP = "rabbitmq.TIMESTAMP";
-    public static final String APP_ID = "rabbitmq.APP_ID";
-    public static final String REQUEUE = "rabbitmq.REQUEUE";
-    public static final String MANDATORY = "rabbitmq.MANDATORY";
-    public static final String IMMEDIATE = "rabbitmq.IMMEDIATE";
-    public static final String RABBITMQ_DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";
-    public static final String RABBITMQ_DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
-    public static final String RABBITMQ_DIRECT_REPLY_EXCHANGE = "";
-    public static final String RABBITMQ_DIRECT_REPLY_ROUTING_KEY = "amq.rabbitmq.reply-to";
-    public static final String RABBITMQ_QUEUE_LENGTH_LIMIT_KEY = "x-max-length";
-    public static final String RABBITMQ_QUEUE_MAX_PRIORITY_KEY = "x-max-priority";
-    public static final String RABBITMQ_QUEUE_MESSAGE_TTL_KEY = "x-message-ttl";
-    public static final String RABBITMQ_QUEUE_TTL_KEY = "x-expires";
-    public static final String RABBITMQ_QUEUE_SINGLE_ACTIVE_CONSUMER_KEY = "x-single-active-consumer";
-
-    private RabbitMQConstants() {
-        // Constants class
+    ROUTING_KEY("rabbitmq_ROUTING_KEY", "The routing key that will be used when sending the message"),

Review comment:
       Any reasons why would Enum would make more sense than pure contents? I don't think description is worth to put it in the Enum which can be a pure Javadoc comment .

##########
File path: catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
##########
@@ -292,37 +296,37 @@ camel exchange then they will be set on the RabbitMQ message.
 |===
 |Property |Value
 
-|`rabbitmq.ROUTING_KEY` |The routing key that will be used when sending the message
+|`rabbitmq_ROUTING_KEY` |The routing key that will be used when sending the message
 
-|`rabbitmq.EXCHANGE_NAME` |The exchange the message was received from
+|`rabbitmq_EXCHANGE_NAME` |The exchange the message was received from
 
-|`rabbitmq.EXCHANGE_OVERRIDE_NAME` |Used for force sending the message to this exchange instead of the endpoint configured name on the producer
+|`rabbitmq_EXCHANGE_OVERRIDE_NAME` |Used for force sending the message to this exchange instead of the endpoint configured name on the producer
 
-|`rabbitmq.CONTENT_TYPE` |The contentType to set on the RabbitMQ message
+|`rabbitmq_CONTENT_TYPE` |The contentType to set on the RabbitMQ message
 
-|`rabbitmq.PRIORITY` |The priority header to set on the RabbitMQ message
+|`rabbitmq_PRIORITY` |The priority header to set on the RabbitMQ message
 
-|`rabbitmq.CORRELATIONID` |The correlationId to set on the RabbitMQ message
+|`rabbitmq_CORRELATIONID` |The correlationId to set on the RabbitMQ message
 
-|`rabbitmq.MESSAGE_ID` |The message id to set on the RabbitMQ message
+|`rabbitmq_MESSAGE_ID` |The message id to set on the RabbitMQ message
 
-|`rabbitmq.DELIVERY_MODE` |If the message should be persistent or not
+|`rabbitmq_DELIVERY_MODE` |If the message should be persistent or not
 
-|`rabbitmq.USERID` |The userId to set on the RabbitMQ message
+|`rabbitmq_USERID` |The userId to set on the RabbitMQ message
 
-|`rabbitmq.CLUSTERID` |The clusterId to set on the RabbitMQ message
+|`rabbitmq_CLUSTERID` |The clusterId to set on the RabbitMQ message
 
-|`rabbitmq.REPLY_TO` |The replyTo to set on the RabbitMQ message
+|`rabbitmq_REPLY_TO` |The replyTo to set on the RabbitMQ message
 
-|`rabbitmq.CONTENT_ENCODING` |The contentEncoding to set on the RabbitMQ message
+|`rabbitmq_CONTENT_ENCODING` |The contentEncoding to set on the RabbitMQ message

Review comment:
       I am wondering, would it be better if you are changing these headers, to follow the Camel style naming, e.g: `CamelRabbitMqContentEncoding`? 
   Also such a change, needs to be added in the migration guide as it is a breaking change for existing camel routes

##########
File path: components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
##########
@@ -49,158 +52,196 @@
     public void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) {
 
         if (properties.getType() != null) {
-            exchange.getIn().setHeader(RabbitMQConstants.TYPE, properties.getType());
+            exchange.getIn().setHeader(RabbitMQConstants.TYPE.key(), properties.getType());
         }
         if (properties.getAppId() != null) {
-            exchange.getIn().setHeader(RabbitMQConstants.APP_ID, properties.getAppId());
+            exchange.getIn().setHeader(RabbitMQConstants.APP_ID.key(), properties.getAppId());
         }
         if (properties.getClusterId() != null) {
-            exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, properties.getClusterId());
+            exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID.key(), properties.getClusterId());
         }
         if (properties.getContentEncoding() != null) {
-            exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, properties.getContentEncoding());
+            exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING.key(), properties.getContentEncoding());
         }
         if (properties.getContentType() != null) {
-            exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, properties.getContentType());
+            exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE.key(), properties.getContentType());
         }
         if (properties.getCorrelationId() != null) {
-            exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, properties.getCorrelationId());
+            exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID.key(), properties.getCorrelationId());
         }
         if (properties.getExpiration() != null) {
-            exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, properties.getExpiration());
+            exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION.key(), properties.getExpiration());
         }
         if (properties.getMessageId() != null) {
-            exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, properties.getMessageId());
+            exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID.key(), properties.getMessageId());
         }
         if (properties.getPriority() != null) {
-            exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, properties.getPriority());
+            exchange.getIn().setHeader(RabbitMQConstants.PRIORITY.key(), properties.getPriority());
         }
         if (properties.getReplyTo() != null) {
-            exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, properties.getReplyTo());
+            exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO.key(), properties.getReplyTo());
         }
         if (properties.getTimestamp() != null) {
-            exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, properties.getTimestamp());
+            exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP.key(), properties.getTimestamp());
         }
         if (properties.getUserId() != null) {
-            exchange.getIn().setHeader(RabbitMQConstants.USERID, properties.getUserId());
+            exchange.getIn().setHeader(RabbitMQConstants.USERID.key(), properties.getUserId());
         }
         if (properties.getDeliveryMode() != null) {
-            exchange.getIn().setHeader(RabbitMQConstants.DELIVERY_MODE, properties.getDeliveryMode());
+            exchange.getIn().setHeader(RabbitMQConstants.DELIVERY_MODE.key(), properties.getDeliveryMode());
         }
     }
 
     public AMQP.BasicProperties.Builder buildProperties(Exchange exchange) {
-        AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
-
         Message msg;
         if (exchange.hasOut()) {
             msg = exchange.getOut();
         } else {
             msg = exchange.getIn();
         }
 
-        final Object contentType = msg.removeHeader(RabbitMQConstants.CONTENT_TYPE);
+        AMQP.BasicProperties.Builder properties = buildBasicAmqpProperties(exchange.getProperties(), msg);
+
+        final Map<String, Object> headers = msg.getHeaders();
+        // Add additional headers (if any)
+        if (additionalHeaders != null) {
+            headers.putAll(additionalHeaders);
+        }
+        Map<String, Object> filteredHeaders = new HashMap<>();
+
+        for (Map.Entry<String, Object> header : headers.entrySet()) {
+            // filter header values.
+            Object value = getValidRabbitMQHeaderValue(header.getKey(), header.getValue());
+
+            // additionally filter out the OVERRIDE header so it does not
+            // propagate
+            if ((value != null || isAllowNullHeaders()) && !header.getKey()
+                    .equals(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME.key())) {
+                boolean filteredHeader;
+                if (!allowCustomHeaders) {
+                    filteredHeader = headerFilterStrategy
+                            .applyFilterToCamelHeaders(header.getKey(), header.getValue(), exchange);
+                    if (filteredHeader) {
+                        filteredHeaders.put(header.getKey(), header.getValue());
+                    }
+                } else {
+                    filteredHeaders.put(header.getKey(), header.getValue());
+                }
+            } else if (LOG.isDebugEnabled()) {
+                if (header.getValue() == null) {
+                    LOG.debug("Ignoring header: {} with null value", header.getKey());
+                } else if (header.getKey().equals(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME.key())) {
+                    LOG.debug("Preventing header propagation: {} with value {}:", header.getKey(),
+                            header.getValue());
+                } else {
+                    LOG.debug("Ignoring header: {} of class: {} with value: {}", header.getKey(),
+                            ObjectHelper.classCanonicalName(header.getValue()), header.getValue());
+                }
+            }
+        }
+
+        properties.headers(filteredHeaders);
+
+        return properties;
+    }
+
+    private AMQP.BasicProperties.Builder buildBasicAmqpProperties(
+            Map<String, Object> exchangeProperties, Message msg) {
+        AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
+
+        Object contentType = getBasicAmqpProperty(exchangeProperties, msg, RabbitMQConstants.CONTENT_TYPE);
         if (contentType != null) {
             properties.contentType(contentType.toString());
         }
 
-        final Object priority = msg.removeHeader(RabbitMQConstants.PRIORITY);

Review comment:
       Any reason why this should be mutable by removing `final`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org