You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/02/24 16:17:30 UTC

[camel] 01/04: CAMEL-14607: support negative acknowledge in pulsar message receipt

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2f48446d81cf9a76f7c99a066fd07cfec77bd6da
Author: connormcauliffe-toast <co...@toasttab.com>
AuthorDate: Fri Feb 21 11:50:25 2020 -0500

    CAMEL-14607: support negative acknowledge in pulsar message receipt
---
 .../pulsar/DefaultPulsarMessageReceipt.java        |  2 +-
 .../pulsar/PulsarConsumerAcknowledgementTest.java  | 52 +++++++++++++++++-----
 .../PulsarConsumerNoAcknowledgementTest.java       | 21 ++++-----
 .../pulsar/PulsarCustomMessageReceiptTest.java     | 17 +++----
 .../pulsar/PulsarNegativeAcknowledgementTest.java  | 33 --------------
 5 files changed, 63 insertions(+), 62 deletions(-)

diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java
index 6d3a1cd..45f6671 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java
@@ -55,7 +55,7 @@ public class DefaultPulsarMessageReceipt implements PulsarMessageReceipt {
 
     @Override
     public void negativeAcknowledge() {
-        throw new UnsupportedOperationException("Negative acknowledge is not supported in this version of the Pulsar client.");
+        consumer.negativeAcknowledge(messageId);
     }
 
     public Consumer getConsumer() {
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
index 1d34ff1..27b2496 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
@@ -25,7 +25,8 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.pulsar.utils.AutoConfiguration;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
-import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -43,11 +44,11 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
     private static final String TOPIC_URI = "persistent://public/default/camel-topic";
     private static final String PRODUCER = "camel-producer-1";
 
-    @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+    @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
                           + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000")
     private Endpoint from;
 
-    @EndpointInject(uri = "mock:result")
+    @EndpointInject("mock:result")
     private MockEndpoint to;
 
     private Producer<String> producer;
@@ -59,23 +60,23 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
     }
 
     @Override
-    protected JndiRegistry createRegistry() throws Exception {
-        JndiRegistry jndi = super.createRegistry();
+    protected Registry createCamelRegistry() throws Exception {
+        Registry registry = new SimpleRegistry();
 
-        registerPulsarBeans(jndi);
+        registerPulsarBeans(registry);
 
-        return jndi;
+        return registry;
     }
 
-    private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException {
+    private void registerPulsarBeans(final Registry registry) throws PulsarClientException {
         PulsarClient pulsarClient = givenPulsarClient();
         AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
 
-        jndi.bind("pulsarClient", pulsarClient);
+        registry.bind("pulsarClient", pulsarClient);
         PulsarComponent comp = new PulsarComponent(context);
         comp.setAutoConfiguration(autoConfiguration);
         comp.setPulsarClient(pulsarClient);
-        jndi.bind("pulsar", comp);
+        registry.bind("pulsar", comp);
     }
 
     private PulsarClient givenPulsarClient() throws PulsarClientException {
@@ -183,4 +184,35 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
 
         MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
     }
+
+    @Test
+    public void testNegativeAcknowledge() throws Exception {
+        to.expectedMessageCount(2);
+        to.expectedBodiesReceived("Hello World!", "Hello World!");
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from(from).routeId("myRoute").to(to).process(exchange -> {
+                    LOGGER.info("Processing message {}", exchange.getIn().getBody());
+
+                    if (!Boolean.parseBoolean(exchange.getProperty("processedOnce", String.class))) {
+                        exchange.setProperty("processedOnce", "true");
+                        PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
+                                .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                        receipt.negativeAcknowledge();
+                    }
+                    else {
+                        PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
+                                .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                        receipt.acknowledge();
+                    }
+                });
+            }
+        });
+
+        producer.newMessage().value("Hello World!").property("proccessedOnce", "false").send();
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+    }
 }
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
index f57b4a5..ccbec4f 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
@@ -23,7 +23,8 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.pulsar.utils.AutoConfiguration;
-import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -36,11 +37,11 @@ public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport {
     private static final String TOPIC_URI = "persistent://public/default/camel-topic";
     private static final String PRODUCER = "camel-producer-1";
 
-    @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+    @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
                           + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&ackTimeoutMillis=1000")
     private Endpoint from;
 
-    @EndpointInject(uri = "mock:result")
+    @EndpointInject("mock:result")
     private MockEndpoint to;
 
     @Override
@@ -55,26 +56,26 @@ public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport {
     }
 
     @Override
-    protected JndiRegistry createRegistry() throws Exception {
-        JndiRegistry jndi = super.createRegistry();
+    protected Registry createCamelRegistry() throws Exception {
+        Registry registry = new SimpleRegistry();
 
-        registerPulsarBeans(jndi);
+        registerPulsarBeans(registry);
 
-        return jndi;
+        return registry;
     }
 
-    private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException {
+    private void registerPulsarBeans(final Registry registry) throws PulsarClientException {
         PulsarClient pulsarClient = givenPulsarClient();
         AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
 
-        jndi.bind("pulsarClient", pulsarClient);
+        registry.bind("pulsarClient", pulsarClient);
         PulsarComponent comp = new PulsarComponent(context);
         comp.setAutoConfiguration(autoConfiguration);
         comp.setPulsarClient(pulsarClient);
         comp.setAllowManualAcknowledgement(true); // Set to true here instead of
                                                   // the endpoint query
                                                   // parameter.
-        jndi.bind("pulsar", comp);
+        registry.bind("pulsar", comp);
     }
 
     private PulsarClient givenPulsarClient() throws PulsarClientException {
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
index eea202e..3cdb832 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
@@ -24,7 +24,8 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.pulsar.utils.AutoConfiguration;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
-import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -68,25 +69,25 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
     }
 
     @Override
-    protected JndiRegistry createRegistry() throws Exception {
-        JndiRegistry jndi = super.createRegistry();
+    protected Registry createCamelRegistry() throws Exception {
+        Registry registry = new SimpleRegistry();
 
-        registerPulsarBeans(jndi);
+        registerPulsarBeans(registry);
 
-        return jndi;
+        return registry;
     }
 
-    private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException {
+    private void registerPulsarBeans(final Registry registry) throws PulsarClientException {
         PulsarClient pulsarClient = givenPulsarClient();
         AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
 
-        jndi.bind("pulsarClient", pulsarClient);
+        registry.bind("pulsarClient", pulsarClient);
         PulsarComponent comp = new PulsarComponent(context);
         comp.setAutoConfiguration(autoConfiguration);
         comp.setPulsarClient(pulsarClient);
         // Test adding a custom PulsarMessageReceiptFactory
         comp.setPulsarMessageReceiptFactory(mockPulsarMessageReceiptFactory);
-        jndi.bind("pulsar", comp);
+        registry.bind("pulsar", comp);
     }
 
     private PulsarClient givenPulsarClient() throws PulsarClientException {
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java
deleted file mode 100644
index abb76a0..0000000
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.pulsar;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.MessageId;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-
-public class PulsarNegativeAcknowledgementTest {
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testNegativeAcknowledgement() {
-        PulsarMessageReceipt receipt = new DefaultPulsarMessageReceipt(mock(Consumer.class), mock(MessageId.class));
-        receipt.negativeAcknowledge();
-    }
-
-}