You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/11 11:16:41 UTC

[camel] branch camel-2.x updated: CAMEL-14047: 2.x port - Allow unset producerName to get unique name from Pulsar (#3238)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.x by this push:
     new e7be0a2  CAMEL-14047: 2.x port - Allow unset producerName to get unique name from Pulsar (#3238)
e7be0a2 is described below

commit e7be0a2219fb53f23b5dcbd48938fe27da432daf
Author: William Thompson <wi...@toasttab.com>
AuthorDate: Fri Oct 11 07:16:32 2019 -0400

    CAMEL-14047: 2.x port - Allow unset producerName to get unique name from Pulsar (#3238)
    
    * Allow null ProducerName to get unique name from Pulsar cherry-pick to 2.x
    
    * Backport test to Camel 2.x
    
    * Remove explicit null declaration
---
 .../src/main/docs/pulsar-component.adoc            |   2 +-
 .../camel/component/pulsar/PulsarProducer.java     |   8 +-
 .../pulsar/configuration/PulsarConfiguration.java  |   4 +-
 .../component/pulsar/PulsarComponentTest.java      |   2 +-
 .../PulsarProducerUndefinedProducerNameInTest.java | 107 +++++++++++++++++++++
 5 files changed, 114 insertions(+), 9 deletions(-)

diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index 67e1e37..dfa4280 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -88,7 +88,7 @@ with the following path and query parameters:
 | *initialSequenceId* (producer) | Set the baseline for the sequence ids for messages published by the producer. First message will be using (initialSequenceId 1) as its sequence id and subsequent messages will be assigned incremental sequence ids, if not otherwise specified. | -1 | long
 | *maxPendingMessages* (producer) | Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. Default is 1000. | 1000 | int
 | *maxPendingMessagesAcross Partitions* (producer) | Set the number of max pending messages across all the partitions. Default is 50000. | 50000 | int
-| *producerName* (producer) | Name of the producer | default-producer | String
+| *producerName* (producer) | Name of the producer, if unset lets Pulsar select a unique identifier | | String
 | *sendTimeoutMs* (producer) | Send timeout in milliseconds. Defaults to 30,000ms (30 seconds) | 30000 | int
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
 |===
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 1d6673a..f583597 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -58,13 +58,9 @@ public class PulsarProducer extends DefaultProducer {
             final String topicUri = pulsarEndpoint.getTopicUri();
             PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
             String producerName = configuration.getProducerName();
-            if (producerName == null) {
-                producerName = topicUri + "-" + Thread.currentThread().getId();
-            }
             final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint
                     .getPulsarClient()
                     .newProducer()
-                    .producerName(producerName)
                     .topic(topicUri)
                     .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
                     .blockIfQueueFull(configuration.isBlockIfQueueFull())
@@ -75,7 +71,9 @@ public class PulsarProducer extends DefaultProducer {
                     .enableBatching(configuration.isBatchingEnabled())
                     .initialSequenceId(configuration.getInitialSequenceId())
                     .compressionType(configuration.getCompressionType());
-
+            if (producerName != null) {
+                producerBuilder.producerName(producerName);
+            }
             producer = producerBuilder.create();
         }
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index fa4772f..8fb3022 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -38,8 +38,8 @@ public class PulsarConfiguration {
     private int consumerQueueSize = 10;
     @UriParam(label = "consumer", defaultValue = "sole-consumer")
     private String consumerName = "sole-consumer";
-    @UriParam(label = "producer", defaultValue = "default-producer")
-    private String producerName = "default-producer";
+    @UriParam(label = "producer")
+    private String producerName;
     @UriParam(label = "consumer", defaultValue = "cons")
     private String consumerNamePrefix = "cons";
     @UriParam(label = "producer", description = "Send timeout in milliseconds", defaultValue = "30000")
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
index be777ad..5764f50 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
@@ -59,7 +59,7 @@ public class PulsarComponentTest extends CamelTestSupport {
         assertEquals("cons", endpoint.getPulsarConfiguration().getConsumerNamePrefix());
         assertEquals(10, endpoint.getPulsarConfiguration().getConsumerQueueSize());
         assertEquals(1, endpoint.getPulsarConfiguration().getNumberOfConsumers());
-        assertEquals("default-producer", endpoint.getPulsarConfiguration().getProducerName());
+        assertNull(endpoint.getPulsarConfiguration().getProducerName());
         assertEquals("subs", endpoint.getPulsarConfiguration().getSubscriptionName());
         assertEquals(SubscriptionType.EXCLUSIVE, endpoint.getPulsarConfiguration().getSubscriptionType());
     }
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerUndefinedProducerNameInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerUndefinedProducerNameInTest.java
new file mode 100644
index 0000000..5085730
--- /dev/null
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerUndefinedProducerNameInTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.Test;
+
+public class PulsarProducerUndefinedProducerNameInTest extends PulsarTestSupport {
+
+    private static final String TOPIC_URI = "persistent://public/default/camel-producer-topic";
+
+    @Produce(uri = "direct:start1")
+    private ProducerTemplate producerTemplate1;
+
+    @Produce(uri = "direct:start2")
+    private ProducerTemplate producerTemplate2;
+
+    @EndpointInject(uri = "pulsar:" + TOPIC_URI
+            + "?numberOfConsumers=1&subscriptionType=Exclusive"
+            + "&subscriptionName=camel-subscription&consumerQueueSize=1"
+            + "&consumerName=camel-consumer"
+    )
+    private Endpoint pulsarEndpoint1;
+
+    @EndpointInject(uri = "pulsar:" + TOPIC_URI)
+    private Endpoint pulsarEndpoint2;
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint to;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from("direct:start1").to(pulsarEndpoint1);
+                from("direct:start2").to(pulsarEndpoint2);
+
+                from(pulsarEndpoint1).to(to);
+            }
+        };
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+
+        registerPulsarBeans(jndi);
+
+        return jndi;
+    }
+
+    private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException {
+        PulsarClient pulsarClient = givenPulsarClient();
+        AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
+
+        jndi.bind("pulsarClient", pulsarClient);
+        PulsarComponent comp = new PulsarComponent(context);
+        comp.setAutoConfiguration(autoConfiguration);
+        comp.setPulsarClient(pulsarClient);
+        jndi.bind("pulsar", comp);
+    }
+
+    private PulsarClient givenPulsarClient() throws PulsarClientException {
+        return new ClientBuilderImpl()
+                .serviceUrl(getPulsarBrokerUrl())
+                .ioThreads(1)
+                .listenerThreads(1)
+                .build();
+    }
+
+    @Test
+    public void testAMessageToRouteIsSentFromBothProducersAndThenConsumed() throws Exception {
+        to.expectedMessageCount(2);
+
+        producerTemplate1.sendBody("Test First");
+        producerTemplate2.sendBody("Test Second");
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+    }
+}