You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/11 10:50:22 UTC
[camel] branch master updated: CAMEL-14047: Allow unset
producerName to get unique name from Pulsar (#3237)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 86e3762 CAMEL-14047: Allow unset producerName to get unique name from Pulsar (#3237)
86e3762 is described below
commit 86e37622a8986b3de8efc0ad9b042aab3fd89d2d
Author: William Thompson <wi...@toasttab.com>
AuthorDate: Fri Oct 11 06:50:16 2019 -0400
CAMEL-14047: Allow unset producerName to get unique name from Pulsar (#3237)
* Allow null ProducerName to get unique name from Pulsar
* Remove explicit null declaration
* Update javadoc, generate adoc and PulsarEndpointBuilderFactory
---
.../src/main/docs/pulsar-component.adoc | 2 +-
.../camel/component/pulsar/PulsarProducer.java | 8 +-
.../pulsar/configuration/PulsarConfiguration.java | 6 +-
.../component/pulsar/PulsarComponentTest.java | 2 +-
.../PulsarProducerUndefinedProducerNameInTest.java | 111 +++++++++++++++++++++
.../endpoint/dsl/PulsarEndpointBuilderFactory.java | 3 +-
6 files changed, 122 insertions(+), 10 deletions(-)
diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index 3e19465..d2eb07a 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -98,7 +98,7 @@ with the following path and query parameters:
| *maxPendingMessagesAcross Partitions* (producer) | Set the number of max pending messages across all the partitions. Default is 50000. | 50000 | int
| *messageRouter* (producer) | Set a custom Message Router. | | MessageRouter
| *messageRoutingMode* (producer) | Set the message routing mode for the producer. | RoundRobinPartition | MessageRoutingMode
-| *producerName* (producer) | Name of the producer | default-producer | String
+| *producerName* (producer) | Name of the producer. If unset, lets Pulsar select a unique identifier. | | String
| *sendTimeoutMs* (producer) | Send timeout in milliseconds. Defaults to 30,000ms (30 seconds) | 30000 | int
| *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 5503949..efc78ba 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -57,10 +57,7 @@ public class PulsarProducer extends DefaultProducer {
final String topicUri = pulsarEndpoint.getUri();
PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
String producerName = configuration.getProducerName();
- if (producerName == null) {
- producerName = topicUri + "-" + Thread.currentThread().getId();
- }
- final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().producerName(producerName).topic(topicUri)
+ final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri)
.sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS).blockIfQueueFull(configuration.isBlockIfQueueFull())
.maxPendingMessages(configuration.getMaxPendingMessages()).maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
.batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS).batchingMaxMessages(configuration.getMaxPendingMessages())
@@ -70,6 +67,9 @@ public class PulsarProducer extends DefaultProducer {
} else {
producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
}
+ if (producerName != null) {
+ producerBuilder.producerName(producerName);
+ }
producer = producerBuilder.create();
}
}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 50b9778..d6bdcb8 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -41,8 +41,8 @@ public class PulsarConfiguration {
private int consumerQueueSize = 10;
@UriParam(label = "consumer", defaultValue = "sole-consumer")
private String consumerName = "sole-consumer";
- @UriParam(label = "producer", defaultValue = "default-producer")
- private String producerName = "default-producer";
+ @UriParam(label = "producer")
+ private String producerName;
@UriParam(label = "consumer", defaultValue = "cons")
private String consumerNamePrefix = "cons";
@UriParam(label = "consumer", defaultValue = "false")
@@ -136,7 +136,7 @@ public class PulsarConfiguration {
}
/**
- * Name of the producer
+ * Name of the producer. If unset, lets Pulsar select a unique identifier.
*/
public void setProducerName(String producerName) {
this.producerName = producerName;
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
index 258df10..6ef8983 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
@@ -60,7 +60,7 @@ public class PulsarComponentTest extends CamelTestSupport {
assertEquals("cons", endpoint.getPulsarConfiguration().getConsumerNamePrefix());
assertEquals(10, endpoint.getPulsarConfiguration().getConsumerQueueSize());
assertEquals(1, endpoint.getPulsarConfiguration().getNumberOfConsumers());
- assertEquals("default-producer", endpoint.getPulsarConfiguration().getProducerName());
+ assertNull(endpoint.getPulsarConfiguration().getProducerName());
assertEquals("subs", endpoint.getPulsarConfiguration().getSubscriptionName());
assertEquals(SubscriptionType.EXCLUSIVE, endpoint.getPulsarConfiguration().getSubscriptionType());
assertFalse(endpoint.getPulsarConfiguration().isAllowManualAcknowledgement());
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerUndefinedProducerNameInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerUndefinedProducerNameInTest.java
new file mode 100644
index 0000000..c65d9ad
--- /dev/null
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerUndefinedProducerNameInTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.Test;
+
+public class PulsarProducerUndefinedProducerNameInTest extends PulsarTestSupport {
+
+ private static final String TOPIC_URI = "persistent://public/default/camel-producer-topic";
+
+ @Produce("direct:start1")
+ private ProducerTemplate producerTemplate1;
+
+ @Produce("direct:start2")
+ private ProducerTemplate producerTemplate2;
+
+ @EndpointInject("pulsar:" + TOPIC_URI
+ + "?numberOfConsumers=1"
+ + "&subscriptionType=Exclusive"
+ + "&subscriptionName=camel-subscription"
+ + "&consumerQueueSize=1"
+ + "&consumerName=camel-consumer"
+ )
+ private Endpoint pulsarEndpoint1;
+
+ @EndpointInject("pulsar:" + TOPIC_URI)
+ private Endpoint pulsarEndpoint2;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint to;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() {
+ from("direct:start1").to(pulsarEndpoint1);
+ from("direct:start2").to(pulsarEndpoint2);
+
+ from(pulsarEndpoint1).to(to);
+ }
+ };
+ }
+
+ @Override
+ protected Registry createCamelRegistry() throws Exception {
+ Registry registry = new SimpleRegistry();
+
+ registerPulsarBeans(registry);
+
+ return registry;
+ }
+
+ private void registerPulsarBeans(Registry registry) throws PulsarClientException {
+ PulsarClient pulsarClient = givenPulsarClient();
+ AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
+
+ registry.bind("pulsarClient", pulsarClient);
+ PulsarComponent comp = new PulsarComponent(context);
+ comp.setAutoConfiguration(autoConfiguration);
+ comp.setPulsarClient(pulsarClient);
+ registry.bind("pulsar", comp);
+ }
+
+ private PulsarClient givenPulsarClient() throws PulsarClientException {
+ return new ClientBuilderImpl()
+ .serviceUrl(getPulsarBrokerUrl())
+ .ioThreads(1)
+ .listenerThreads(1)
+ .build();
+ }
+
+ @Test
+ public void testAMessageToRouteIsSentFromBothProducersAndThenConsumed() throws Exception {
+ to.expectedMessageCount(2);
+
+ producerTemplate1.sendBody("Test First");
+ producerTemplate2.sendBody("Test Second");
+
+ MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+ }
+}
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
index ace4f65..e5f4f00 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
@@ -717,7 +717,8 @@ public interface PulsarEndpointBuilderFactory {
return this;
}
/**
- * Name of the producer.
+ * Name of the producer. If unset, lets Pulsar select a unique
+ * identifier.
*
* The option is a: <code>java.lang.String</code> type.
*