You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2022/11/04 09:10:35 UTC

[camel] branch main updated: CAMEL-18447 Camel-pubsub: AsyncTaskException: Asynchronous task failed with real account

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a5463a8d31 CAMEL-18447 Camel-pubsub: AsyncTaskException: Asynchronous task failed with real account
1a5463a8d31 is described below

commit 1a5463a8d31fa95f908be0a97a6ceab462fb488e
Author: JiriOndrusek <on...@gmail.com>
AuthorDate: Thu Sep 1 15:29:03 2022 +0200

    CAMEL-18447 Camel-pubsub: AsyncTaskException: Asynchronous task failed with real account
---
 .../camel/component/google/pubsub/GooglePubsubConsumer.java    |  6 +++++-
 .../component/google/pubsub/consumer/AcknowledgeSync.java      | 10 ++++++----
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index e639e400e22..ef3dc08a443 100644
--- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -192,8 +192,12 @@ public class GooglePubsubConsumer extends DefaultConsumer {
                         }
 
                         if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
+                            //existing subscriber can not be propagated, because it will be closed at the end of this block
+                            //subscriber will be created at the moment of use
+                            // (see  https://issues.apache.org/jira/browse/CAMEL-18447)
                             exchange.adapt(ExtendedExchange.class)
-                                    .addOnCompletion(new AcknowledgeSync(subscriber, subscriptionName));
+                                    .addOnCompletion(new AcknowledgeSync(
+                                            () -> endpoint.getComponent().getSubscriberStub(endpoint), subscriptionName));
                         }
 
                         try {
diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
index df4498607d8..0c2fa0528f3 100644
--- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
+++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.google.pubsub.consumer;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 import com.google.cloud.pubsub.v1.stub.SubscriberStub;
 import com.google.pubsub.v1.AcknowledgeRequest;
@@ -28,11 +29,12 @@ import org.apache.camel.spi.Synchronization;
 
 public class AcknowledgeSync implements Synchronization {
 
-    private final SubscriberStub subscriber;
+    //Supplier cannot be used because of thrown exception (Callback used instead)
+    private final Callable<SubscriberStub> subscriberStubSupplier;
     private final String subscriptionName;
 
-    public AcknowledgeSync(SubscriberStub subscriber, String subscriptionName) {
-        this.subscriber = subscriber;
+    public AcknowledgeSync(Callable<SubscriberStub> subscriberStubSupplier, String subscriptionName) {
+        this.subscriberStubSupplier = subscriberStubSupplier;
         this.subscriptionName = subscriptionName;
     }
 
@@ -41,7 +43,7 @@ public class AcknowledgeSync implements Synchronization {
         AcknowledgeRequest ackRequest = AcknowledgeRequest.newBuilder()
                 .addAllAckIds(getAckIdList(exchange))
                 .setSubscription(subscriptionName).build();
-        try {
+        try (SubscriberStub subscriber = subscriberStubSupplier.call()) {
             subscriber.acknowledgeCallable().call(ackRequest);
         } catch (Exception e) {
             throw new RuntimeCamelException(e);