You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/09/20 10:44:40 UTC

[camel] 02/05: CAMEL-14001 - Camel-Pulsar: Add MessageRoutingMode and MessageRouter option to producer, messageRouter option

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 56cb7e3f0496cd757726cadbdb3065ab9729daea
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Sep 20 12:37:37 2019 +0200

    CAMEL-14001 - Camel-Pulsar: Add MessageRoutingMode and MessageRouter option to producer, messageRouter option
---
 .../src/main/docs/pulsar-component.adoc            |  3 ++-
 .../camel/component/pulsar/PulsarProducer.java     |  9 ++++++--
 .../pulsar/configuration/PulsarConfiguration.java  | 14 +++++++++++++
 .../endpoint/dsl/PulsarEndpointBuilderFactory.java | 24 ++++++++++++++++++++++
 4 files changed, 47 insertions(+), 3 deletions(-)

diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index fa40b34..3e19465 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -69,7 +69,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (26 parameters):
+=== Query Parameters (27 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -96,6 +96,7 @@ with the following path and query parameters:
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
 | *maxPendingMessages* (producer) | Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. Default is 1000. | 1000 | int
 | *maxPendingMessagesAcross Partitions* (producer) | Set the number of max pending messages across all the partitions. Default is 50000. | 50000 | int
+| *messageRouter* (producer) | Set a custom Message Router. |  | MessageRouter
 | *messageRoutingMode* (producer) | Set the message routing mode for the producer. | RoundRobinPartition | MessageRoutingMode
 | *producerName* (producer) | Name of the producer | default-producer | String
 | *sendTimeoutMs* (producer) | Send timeout in milliseconds. Defaults to 30,000ms (30 seconds) | 30000 | int
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index f4a5646..d7c8b0a 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -25,6 +25,7 @@ import org.apache.camel.TypeConversionException;
 import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
 import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 
@@ -73,8 +74,12 @@ public class PulsarProducer extends DefaultProducer {
                     .batchingMaxMessages(configuration.getMaxPendingMessages())
                     .enableBatching(configuration.isBatchingEnabled())
                     .initialSequenceId(configuration.getInitialSequenceId())
-                    .compressionType(configuration.getCompressionType())
-                    .messageRoutingMode(configuration.getMessageRoutingMode());
+                    .compressionType(configuration.getCompressionType());
+            if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
+                producerBuilder.messageRouter(configuration.getMessageRouter());
+            } else {
+            	producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
+            }
             producer = producerBuilder.create();
         }
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 4366166..ea34312 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.camel.component.pulsar.utils.consumers.SubscriptionType;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 
 import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.EXCLUSIVE;
@@ -72,6 +73,8 @@ public class PulsarConfiguration {
     private CompressionType compressionType = CompressionType.NONE;
     @UriParam(label = "producer", description = "MessageRoutingMode", defaultValue = "RoundRobinPartition")
     private MessageRoutingMode messageRoutingMode = MessageRoutingMode.RoundRobinPartition;
+    @UriParam(label = "producer", description = "Custom Message Router")
+    private MessageRouter messageRouter;
 
     public String getSubscriptionName() {
         return subscriptionName;
@@ -319,6 +322,17 @@ public class PulsarConfiguration {
 	public void setMessageRoutingMode(MessageRoutingMode messageRoutingMode) {
 		this.messageRoutingMode = messageRoutingMode;
 	}
+
+    /**
+     * Set a custom Message Router.
+     */
+	public MessageRouter getMessageRouter() {
+		return messageRouter;
+	}
+
+	public void setMessageRouter(MessageRouter messageRouter) {
+		this.messageRouter = messageRouter;
+	}
     
 
 }
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
index 943e2f1..ace4f65 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
@@ -667,6 +667,30 @@ public interface PulsarEndpointBuilderFactory {
             return this;
         }
         /**
+         * Set a custom Message Router.
+         * 
+         * The option is a:
+         * <code>org.apache.pulsar.client.api.MessageRouter</code> type.
+         * 
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder messageRouter(Object messageRouter) {
+            doSetProperty("messageRouter", messageRouter);
+            return this;
+        }
+        /**
+         * Set a custom Message Router.
+         * 
+         * The option will be converted to a
+         * <code>org.apache.pulsar.client.api.MessageRouter</code> type.
+         * 
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder messageRouter(String messageRouter) {
+            doSetProperty("messageRouter", messageRouter);
+            return this;
+        }
+        /**
          * Set the message routing mode for the producer.
          * 
          * The option is a: