You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/11 10:50:22 UTC

[camel] branch master updated: CAMEL-14047: Allow unset producerName to get unique name from Pulsar (#3237)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 86e3762  CAMEL-14047: Allow unset producerName to get unique name from Pulsar (#3237)
86e3762 is described below

commit 86e37622a8986b3de8efc0ad9b042aab3fd89d2d
Author: William Thompson <wi...@toasttab.com>
AuthorDate: Fri Oct 11 06:50:16 2019 -0400

    CAMEL-14047: Allow unset producerName to get unique name from Pulsar (#3237)
    
    * Allow null ProducerName to get unique name from Pulsar
    
    * Remove explicit null declaration
    
    * Update javadoc, generate adoc and PulsarEndpointBuilderFactory
---
 .../src/main/docs/pulsar-component.adoc            |   2 +-
 .../camel/component/pulsar/PulsarProducer.java     |   8 +-
 .../pulsar/configuration/PulsarConfiguration.java  |   6 +-
 .../component/pulsar/PulsarComponentTest.java      |   2 +-
 .../PulsarProducerUndefinedProducerNameInTest.java | 111 +++++++++++++++++++++
 .../endpoint/dsl/PulsarEndpointBuilderFactory.java |   3 +-
 6 files changed, 122 insertions(+), 10 deletions(-)

diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index 3e19465..d2eb07a 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -98,7 +98,7 @@ with the following path and query parameters:
 | *maxPendingMessagesAcross Partitions* (producer) | Set the number of max pending messages across all the partitions. Default is 50000. | 50000 | int
 | *messageRouter* (producer) | Set a custom Message Router. |  | MessageRouter
 | *messageRoutingMode* (producer) | Set the message routing mode for the producer. | RoundRobinPartition | MessageRoutingMode
-| *producerName* (producer) | Name of the producer | default-producer | String
+| *producerName* (producer) | Name of the producer. If unset, lets Pulsar select a unique identifier. |  | String
 | *sendTimeoutMs* (producer) | Send timeout in milliseconds. Defaults to 30,000ms (30 seconds) | 30000 | int
 | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 5503949..efc78ba 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -57,10 +57,7 @@ public class PulsarProducer extends DefaultProducer {
             final String topicUri = pulsarEndpoint.getUri();
             PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
             String producerName = configuration.getProducerName();
-            if (producerName == null) {
-                producerName = topicUri + "-" + Thread.currentThread().getId();
-            }
-            final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().producerName(producerName).topic(topicUri)
+            final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri)
                 .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS).blockIfQueueFull(configuration.isBlockIfQueueFull())
                 .maxPendingMessages(configuration.getMaxPendingMessages()).maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
                 .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS).batchingMaxMessages(configuration.getMaxPendingMessages())
@@ -70,6 +67,9 @@ public class PulsarProducer extends DefaultProducer {
             } else {
                 producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
             }
+            if (producerName != null) {
+                producerBuilder.producerName(producerName);
+            }
             producer = producerBuilder.create();
         }
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 50b9778..d6bdcb8 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -41,8 +41,8 @@ public class PulsarConfiguration {
     private int consumerQueueSize = 10;
     @UriParam(label = "consumer", defaultValue = "sole-consumer")
     private String consumerName = "sole-consumer";
-    @UriParam(label = "producer", defaultValue = "default-producer")
-    private String producerName = "default-producer";
+    @UriParam(label = "producer")
+    private String producerName;
     @UriParam(label = "consumer", defaultValue = "cons")
     private String consumerNamePrefix = "cons";
     @UriParam(label = "consumer", defaultValue = "false")
@@ -136,7 +136,7 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Name of the producer
+     * Name of the producer. If unset, lets Pulsar select a unique identifier.
      */
     public void setProducerName(String producerName) {
         this.producerName = producerName;
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
index 258df10..6ef8983 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
@@ -60,7 +60,7 @@ public class PulsarComponentTest extends CamelTestSupport {
         assertEquals("cons", endpoint.getPulsarConfiguration().getConsumerNamePrefix());
         assertEquals(10, endpoint.getPulsarConfiguration().getConsumerQueueSize());
         assertEquals(1, endpoint.getPulsarConfiguration().getNumberOfConsumers());
-        assertEquals("default-producer", endpoint.getPulsarConfiguration().getProducerName());
+        assertNull(endpoint.getPulsarConfiguration().getProducerName());
         assertEquals("subs", endpoint.getPulsarConfiguration().getSubscriptionName());
         assertEquals(SubscriptionType.EXCLUSIVE, endpoint.getPulsarConfiguration().getSubscriptionType());
         assertFalse(endpoint.getPulsarConfiguration().isAllowManualAcknowledgement());
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerUndefinedProducerNameInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerUndefinedProducerNameInTest.java
new file mode 100644
index 0000000..c65d9ad
--- /dev/null
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerUndefinedProducerNameInTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.Test;
+
+public class PulsarProducerUndefinedProducerNameInTest extends PulsarTestSupport {
+
+    private static final String TOPIC_URI = "persistent://public/default/camel-producer-topic";
+
+    @Produce("direct:start1")
+    private ProducerTemplate producerTemplate1;
+
+    @Produce("direct:start2")
+    private ProducerTemplate producerTemplate2;
+
+    @EndpointInject("pulsar:" + TOPIC_URI
+            + "?numberOfConsumers=1"
+            + "&subscriptionType=Exclusive"
+            + "&subscriptionName=camel-subscription"
+            + "&consumerQueueSize=1"
+            + "&consumerName=camel-consumer"
+    )
+    private Endpoint pulsarEndpoint1;
+
+    @EndpointInject("pulsar:" + TOPIC_URI)
+    private Endpoint pulsarEndpoint2;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from("direct:start1").to(pulsarEndpoint1);
+                from("direct:start2").to(pulsarEndpoint2);
+
+                from(pulsarEndpoint1).to(to);
+            }
+        };
+    }
+
+    @Override
+    protected Registry createCamelRegistry() throws Exception {
+        Registry registry = new SimpleRegistry();
+
+        registerPulsarBeans(registry);
+
+        return registry;
+    }
+
+    private void registerPulsarBeans(Registry registry) throws PulsarClientException {
+        PulsarClient pulsarClient = givenPulsarClient();
+        AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
+
+        registry.bind("pulsarClient", pulsarClient);
+        PulsarComponent comp = new PulsarComponent(context);
+        comp.setAutoConfiguration(autoConfiguration);
+        comp.setPulsarClient(pulsarClient);
+        registry.bind("pulsar", comp);
+    }
+
+    private PulsarClient givenPulsarClient() throws PulsarClientException {
+        return new ClientBuilderImpl()
+                .serviceUrl(getPulsarBrokerUrl())
+                .ioThreads(1)
+                .listenerThreads(1)
+                .build();
+    }
+
+    @Test
+    public void testAMessageToRouteIsSentFromBothProducersAndThenConsumed() throws Exception {
+        to.expectedMessageCount(2);
+
+        producerTemplate1.sendBody("Test First");
+        producerTemplate2.sendBody("Test Second");
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+    }
+}
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
index ace4f65..e5f4f00 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
@@ -717,7 +717,8 @@ public interface PulsarEndpointBuilderFactory {
             return this;
         }
         /**
-         * Name of the producer.
+         * Name of the producer. If unset, lets Pulsar select a unique
+         * identifier.
          * 
          * The option is a: <code>java.lang.String</code> type.
          *