You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/02/24 16:17:30 UTC
[camel] 01/04: CAMEL-14607: support negative acknowledge in pulsar
message receipt
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2f48446d81cf9a76f7c99a066fd07cfec77bd6da
Author: connormcauliffe-toast <co...@toasttab.com>
AuthorDate: Fri Feb 21 11:50:25 2020 -0500
CAMEL-14607: support negative acknowledge in pulsar message receipt
---
.../pulsar/DefaultPulsarMessageReceipt.java | 2 +-
.../pulsar/PulsarConsumerAcknowledgementTest.java | 52 +++++++++++++++++-----
.../PulsarConsumerNoAcknowledgementTest.java | 21 ++++-----
.../pulsar/PulsarCustomMessageReceiptTest.java | 17 +++----
.../pulsar/PulsarNegativeAcknowledgementTest.java | 33 --------------
5 files changed, 63 insertions(+), 62 deletions(-)
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java
index 6d3a1cd..45f6671 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java
@@ -55,7 +55,7 @@ public class DefaultPulsarMessageReceipt implements PulsarMessageReceipt {
@Override
public void negativeAcknowledge() {
- throw new UnsupportedOperationException("Negative acknowledge is not supported in this version of the Pulsar client.");
+ consumer.negativeAcknowledge(messageId);
}
public Consumer getConsumer() {
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
index 1d34ff1..27b2496 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
@@ -25,7 +25,8 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.pulsar.utils.AutoConfiguration;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
-import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -43,11 +44,11 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
private static final String TOPIC_URI = "persistent://public/default/camel-topic";
private static final String PRODUCER = "camel-producer-1";
- @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+ @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+ "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000")
private Endpoint from;
- @EndpointInject(uri = "mock:result")
+ @EndpointInject("mock:result")
private MockEndpoint to;
private Producer<String> producer;
@@ -59,23 +60,23 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
}
@Override
- protected JndiRegistry createRegistry() throws Exception {
- JndiRegistry jndi = super.createRegistry();
+ protected Registry createCamelRegistry() throws Exception {
+ Registry registry = new SimpleRegistry();
- registerPulsarBeans(jndi);
+ registerPulsarBeans(registry);
- return jndi;
+ return registry;
}
- private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException {
+ private void registerPulsarBeans(final Registry registry) throws PulsarClientException {
PulsarClient pulsarClient = givenPulsarClient();
AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
- jndi.bind("pulsarClient", pulsarClient);
+ registry.bind("pulsarClient", pulsarClient);
PulsarComponent comp = new PulsarComponent(context);
comp.setAutoConfiguration(autoConfiguration);
comp.setPulsarClient(pulsarClient);
- jndi.bind("pulsar", comp);
+ registry.bind("pulsar", comp);
}
private PulsarClient givenPulsarClient() throws PulsarClientException {
@@ -183,4 +184,35 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
}
+
+ @Test
+ public void testNegativeAcknowledge() throws Exception {
+ to.expectedMessageCount(2);
+ to.expectedBodiesReceived("Hello World!", "Hello World!");
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).routeId("myRoute").to(to).process(exchange -> {
+ LOGGER.info("Processing message {}", exchange.getIn().getBody());
+
+ if (!Boolean.parseBoolean(exchange.getProperty("processedOnce", String.class))) {
+ exchange.setProperty("processedOnce", "true");
+ PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
+ .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+ receipt.negativeAcknowledge();
+ }
+ else {
+ PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
+ .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+ receipt.acknowledge();
+ }
+ });
+ }
+ });
+
+ producer.newMessage().value("Hello World!").property("proccessedOnce", "false").send();
+
+ MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+ }
}
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
index f57b4a5..ccbec4f 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
@@ -23,7 +23,8 @@ import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.pulsar.utils.AutoConfiguration;
-import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -36,11 +37,11 @@ public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport {
private static final String TOPIC_URI = "persistent://public/default/camel-topic";
private static final String PRODUCER = "camel-producer-1";
- @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+ @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+ "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&ackTimeoutMillis=1000")
private Endpoint from;
- @EndpointInject(uri = "mock:result")
+ @EndpointInject("mock:result")
private MockEndpoint to;
@Override
@@ -55,26 +56,26 @@ public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport {
}
@Override
- protected JndiRegistry createRegistry() throws Exception {
- JndiRegistry jndi = super.createRegistry();
+ protected Registry createCamelRegistry() throws Exception {
+ Registry registry = new SimpleRegistry();
- registerPulsarBeans(jndi);
+ registerPulsarBeans(registry);
- return jndi;
+ return registry;
}
- private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException {
+ private void registerPulsarBeans(final Registry registry) throws PulsarClientException {
PulsarClient pulsarClient = givenPulsarClient();
AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
- jndi.bind("pulsarClient", pulsarClient);
+ registry.bind("pulsarClient", pulsarClient);
PulsarComponent comp = new PulsarComponent(context);
comp.setAutoConfiguration(autoConfiguration);
comp.setPulsarClient(pulsarClient);
comp.setAllowManualAcknowledgement(true); // Set to true here instead of
// the endpoint query
// parameter.
- jndi.bind("pulsar", comp);
+ registry.bind("pulsar", comp);
}
private PulsarClient givenPulsarClient() throws PulsarClientException {
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
index eea202e..3cdb832 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
@@ -24,7 +24,8 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.pulsar.utils.AutoConfiguration;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
-import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -68,25 +69,25 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
}
@Override
- protected JndiRegistry createRegistry() throws Exception {
- JndiRegistry jndi = super.createRegistry();
+ protected Registry createCamelRegistry() throws Exception {
+ Registry registry = new SimpleRegistry();
- registerPulsarBeans(jndi);
+ registerPulsarBeans(registry);
- return jndi;
+ return registry;
}
- private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException {
+ private void registerPulsarBeans(final Registry registry) throws PulsarClientException {
PulsarClient pulsarClient = givenPulsarClient();
AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
- jndi.bind("pulsarClient", pulsarClient);
+ registry.bind("pulsarClient", pulsarClient);
PulsarComponent comp = new PulsarComponent(context);
comp.setAutoConfiguration(autoConfiguration);
comp.setPulsarClient(pulsarClient);
// Test adding a custom PulsarMessageReceiptFactory
comp.setPulsarMessageReceiptFactory(mockPulsarMessageReceiptFactory);
- jndi.bind("pulsar", comp);
+ registry.bind("pulsar", comp);
}
private PulsarClient givenPulsarClient() throws PulsarClientException {
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java
deleted file mode 100644
index abb76a0..0000000
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.pulsar;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.MessageId;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-
-public class PulsarNegativeAcknowledgementTest {
-
- @Test(expected = UnsupportedOperationException.class)
- public void testNegativeAcknowledgement() {
- PulsarMessageReceipt receipt = new DefaultPulsarMessageReceipt(mock(Consumer.class), mock(MessageId.class));
- receipt.negativeAcknowledge();
- }
-
-}