You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/03/22 11:07:35 UTC

[camel] branch camel-3.x updated: Rethrow exceptions in Azure Service Bus consumer (#9593)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.x by this push:
     new a56b1976741 Rethrow exceptions in Azure Service Bus consumer (#9593)
a56b1976741 is described below

commit a56b19767413dc8f07861cfdf96748943321fb12
Author: Dylan Piergies <dy...@gmail.com>
AuthorDate: Wed Mar 22 11:07:27 2023 +0000

    Rethrow exceptions in Azure Service Bus consumer (#9593)
    
    * CAMEL-19155 Re-throw exceptions in `ServiceBusConsumer`
    
    If an exception occurred during Exchange processing, the exception must
    be rethrown in the message receive and error handlers to trigger the
    `ServiceBusReceiverAsyncClient` to abandon the message so that the
    message is requeued.
    
    * Revert "CAMEL-19155 Re-throw exceptions in `ServiceBusConsumer`"
    
    This reverts commit a2efbed012935b760d56dc935c4736c2bb303e81.
    
    * CAMEL-19155 - Explicitly complete/abandon Azure Service Bus messages
    
    Taking control of message complete/abandon when auto-complete is
    enabled, since allowing the Service Bus client to do this automatically
    causes messages to be completed even if the Exchange failed because
    errors do not propagate back to the reactive message Publisher.
    
    * CAMEL-19155 - Add explanatory comment
    
    * CAMEL-19155 - Correct explanatory comment
    
    ---------
    
    Co-authored-by: Dylan Piergies <Dy...@tesco.com>
---
 .../azure/servicebus/ServiceBusConsumer.java       | 22 ++++++++++++++++++----
 .../servicebus/client/ServiceBusClientFactory.java |  7 ++++---
 2 files changed, 22 insertions(+), 7 deletions(-)

diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
index fcf1f5c6385..167f303b9c6 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
+++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
@@ -30,14 +30,13 @@ import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.azure.servicebus.client.ServiceBusClientFactory;
 import org.apache.camel.component.azure.servicebus.client.ServiceBusReceiverAsyncClientWrapper;
 import org.apache.camel.component.azure.servicebus.operations.ServiceBusReceiverOperations;
-import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.ObjectHelper;
+import reactor.core.scheduler.Schedulers;
 
 public class ServiceBusConsumer extends DefaultConsumer {
 
-    private Synchronization onCompletion;
     private ServiceBusReceiverAsyncClientWrapper clientWrapper;
     private ServiceBusReceiverOperations operations;
 
@@ -55,7 +54,6 @@ public class ServiceBusConsumer extends DefaultConsumer {
     @Override
     protected void doInit() throws Exception {
         super.doInit();
-        onCompletion = new ConsumerOnCompletion();
     }
 
     @Override
@@ -139,7 +137,7 @@ public class ServiceBusConsumer extends DefaultConsumer {
 
     private void onEventListener(final ServiceBusReceivedMessage message) {
         final Exchange exchange = createServiceBusExchange(message);
-
+        final ConsumerOnCompletion onCompletion = new ConsumerOnCompletion(message);
         // add exchange callback
         exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
         // use default consumer callback
@@ -202,6 +200,19 @@ public class ServiceBusConsumer extends DefaultConsumer {
     }
 
     private class ConsumerOnCompletion extends SynchronizationAdapter {
+        private final ServiceBusReceivedMessage message;
+
+        public ConsumerOnCompletion(ServiceBusReceivedMessage message) {
+            this.message = message;
+        }
+
+        @Override
+        public void onComplete(Exchange exchange) {
+            super.onComplete(exchange);
+            if (!getConfiguration().isDisableAutoComplete()) {
+                clientWrapper.complete(message).subscribeOn(Schedulers.boundedElastic()).subscribe();
+            }
+        }
 
         @Override
         public void onFailure(Exchange exchange) {
@@ -209,6 +220,9 @@ public class ServiceBusConsumer extends DefaultConsumer {
             if (cause != null) {
                 getExceptionHandler().handleException("Error during processing exchange.", exchange, cause);
             }
+            if (!getConfiguration().isDisableAutoComplete()) {
+                clientWrapper.abandon(message).subscribeOn(Schedulers.boundedElastic()).subscribe();
+            }
         }
     }
 }
diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/client/ServiceBusClientFactory.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/client/ServiceBusClientFactory.java
index 7292bc3dce4..0e2e1161b26 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/client/ServiceBusClientFactory.java
+++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/client/ServiceBusClientFactory.java
@@ -79,9 +79,10 @@ public final class ServiceBusClientFactory {
             final ServiceBusClientBuilder busClientBuilder, final ServiceBusConfiguration configuration) {
         final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverClientBuilder = busClientBuilder.receiver();
 
-        if (configuration.isDisableAutoComplete()) {
-            receiverClientBuilder.disableAutoComplete();
-        }
+        // We handle auto-complete in the consumer, since we have no way to propagate errors back to the reactive
+        // pipeline messages are published on so the message would be completed even if an error occurs during Exchange
+        // processing.
+        receiverClientBuilder.disableAutoComplete();
 
         if (configuration.getServiceBusType() == ServiceBusType.queue) {
             return receiverClientBuilder.queueName(configuration.getTopicOrQueueName());