You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/15 04:55:57 UTC
[camel] branch master updated: CAMEL-14184: Allow setting Pulsar
Message properties, event_time, key fields (#3340)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new e8e3b57 CAMEL-14184: Allow setting Pulsar Message properties, event_time, key fields (#3340)
e8e3b57 is described below
commit e8e3b57b4d51f4550e23e8d19f31c25697ee205c
Author: William Thompson <wi...@toasttab.com>
AuthorDate: Thu Nov 14 23:55:49 2019 -0500
CAMEL-14184: Allow setting Pulsar Message properties, event_time, key fields (#3340)
* Configurable Pulsar Message through Camel Exchange headers
* Fix docs typos
* Add missing license text
* Remove unused import
---
.../src/main/docs/pulsar-component.adoc | 31 +++++
.../camel/component/pulsar/PulsarProducer.java | 28 ++++-
.../pulsar/utils/message/PulsarMessageHeaders.java | 3 +
.../pulsar/PulsarProducerHeadersInTest.java | 127 +++++++++++++++++++++
4 files changed, 187 insertions(+), 2 deletions(-)
diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index e763f6b..01b5bee 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -147,3 +147,34 @@ The component supports 8 options, which are listed below.
| *camel.component.pulsar.pulsar-message-receipt-factory* | Provide a factory to create an alternate implementation of PulsarMessageReceipt. The option is a org.apache.camel.component.pulsar.PulsarMessageReceiptFactory type. | | String
|===
// spring-boot-auto-configure options: END
+
+// message-headers options: START
+=== Message headers evaluated by the Pulsar producer
+
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|===
+| Header | Type | Description
+| `CamelPulsarProducerMessageKey` | `String` | Sets the key on the message for the Pulsar routing policy
+| `CamelPulsarProducerMessageProperties` | `Map<String,String>` | The properties to set on the Pulsar message
+| `CamelPulsarProducerEventTime` | `long` | Sets the event time on the message
+|===
+
+=== Message headers set by the Pulsar consumer
+
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|===
+| Header | Type | Description
+| `properties` | `Map<String,String>` | The properties from the Pulsar message or the empty Map if unset on the Pulsar message
+| `producer_name` | `String` | The name of the producer that created the message
+| `sequence_id` | `long` | Sequence identifier of the Pulsar message
+| `publish_time` | `long` | Time the Pulsar message was published to the topic
+| `message_id` | `MessageId` | Unique identifier of the message
+| `event_time` | `long` | The event time associated with the message or 0 if unset on the Pulsar message
+| `key` | `String` | The key of the Pulsar message in String form or the empty string if unset on the Pulsar message
+| `key_bytes` | `byte[]` | The bytes in the key. If the key has been base64 encoded, it is decoded before being returned. Otherwise, if the key is a plain string, the UTF-8 encoded bytes of the string.
+| `topic_name` | `String` | The topic to which the message was published
+| `manual_acknowledgement` | `PulsarManualAcknowledgement` | If allowManualAcknowledgement is set, this will contain the object for manually acknowledging the Pulsar message; otherwise it is unset
+|===
+// message-headers options: END
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index efc78ba..4db08ba 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.pulsar;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
@@ -23,11 +24,14 @@ import org.apache.camel.Message;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.TypeConversionException;
import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
public class PulsarProducer extends DefaultProducer {
@@ -42,14 +46,34 @@ public class PulsarProducer extends DefaultProducer {
@Override
public void process(final Exchange exchange) throws Exception {
final Message message = exchange.getIn();
+
+ TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
byte[] body;
try {
- body = exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, exchange, message.getBody());
+ body = exchange.getContext().getTypeConverter()
+ .mandatoryConvertTo(byte[].class, exchange, message.getBody());
} catch (NoTypeConversionAvailableException | TypeConversionException exception) {
// fallback to try serialize the data
body = PulsarMessageUtils.serialize(message.getBody());
}
- producer.send(body);
+ messageBuilder.value(body);
+
+ String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
+ if (ObjectHelper.isNotEmpty(key)) {
+ messageBuilder.key(key);
+ }
+
+ Map<String, String> properties = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
+ if (ObjectHelper.isNotEmpty(properties)) {
+ messageBuilder.properties(properties);
+ }
+
+ Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
+ if (eventTime != null) {
+ messageBuilder.eventTime(eventTime);
+ }
+
+ messageBuilder.send();
}
private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException {
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
index a983564..978d78b 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
@@ -28,4 +28,7 @@ public interface PulsarMessageHeaders {
String KEY_BYTES = "key_bytes";
String TOPIC_NAME = "topic_name";
String MESSAGE_RECEIPT = "message_receipt";
+ String KEY_OUT = "CamelPulsarProducerMessageKey";
+ String PROPERTIES_OUT = "CamelPulsarProducerMessageProperties";
+ String EVENT_TIME_OUT = "CamelPulsarProducerMessageEventTime";
}
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerHeadersInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerHeadersInTest.java
new file mode 100644
index 0000000..21fd49c
--- /dev/null
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerHeadersInTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.Test;
+
+public class PulsarProducerHeadersInTest extends PulsarTestSupport {
+
+ private static final String TOPIC_URI = "persistent://public/default/camel-producer-topic";
+ private static final String PRODUCER = "camel-producer";
+
+ @Produce("direct:start")
+ private ProducerTemplate producerTemplate;
+
+ @EndpointInject("pulsar:" + TOPIC_URI
+ + "?numberOfConsumers=1&subscriptionType=Exclusive"
+ + "&subscriptionName=camel-subscription"
+ + "&consumerQueueSize=1"
+ + "&consumerName=camel-consumer"
+ + "&producerName=" + PRODUCER
+ )
+ private Endpoint pulsar;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint mock;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() {
+ from("direct:start").to(pulsar);
+ from(pulsar).to(mock);
+ }
+ };
+ }
+
+ @Override
+ protected Registry createCamelRegistry() throws Exception {
+ Registry registry = new SimpleRegistry();
+
+ registerPulsarBeans(registry);
+
+ return registry;
+ }
+
+ private void registerPulsarBeans(final Registry registry) throws PulsarClientException {
+ PulsarClient pulsarClient = givenPulsarClient();
+ AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
+
+ registry.bind("pulsarClient", pulsarClient);
+ PulsarComponent comp = new PulsarComponent(context);
+ comp.setAutoConfiguration(autoConfiguration);
+ comp.setPulsarClient(pulsarClient);
+ registry.bind("pulsar", comp);
+ }
+
+ private PulsarClient givenPulsarClient() throws PulsarClientException {
+ return new ClientBuilderImpl()
+ .serviceUrl(getPulsarBrokerUrl())
+ .ioThreads(1)
+ .listenerThreads(1)
+ .build();
+ }
+
+ @Test
+ public void propertyHeaderSetsPulsarProperties() throws InterruptedException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("testProperty", "testValue");
+ mock.expectedHeaderReceived(PulsarMessageHeaders.PROPERTIES, properties);
+
+ producerTemplate.sendBodyAndHeader("test", PulsarMessageHeaders.PROPERTIES_OUT, properties);
+
+ MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock);
+ }
+
+ @Test
+ public void eventTimeHeaderSetsPulsarEventTime() throws InterruptedException {
+ long eventTime = 10000;
+ mock.expectedHeaderReceived(PulsarMessageHeaders.EVENT_TIME, eventTime);
+
+ producerTemplate.sendBodyAndHeader("test", PulsarMessageHeaders.EVENT_TIME_OUT, eventTime);
+
+ MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock);
+ }
+
+ @Test
+ public void keyHeaderSetsPulsarKey() throws InterruptedException {
+ String key = "testKey";
+ mock.expectedHeaderReceived(PulsarMessageHeaders.KEY, key);
+
+ producerTemplate.sendBodyAndHeader("test", PulsarMessageHeaders.KEY_OUT, key);
+
+ MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock);
+ }
+}