You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2022/11/04 09:10:35 UTC
[camel] branch main updated: CAMEL-18447 Camel-pubsub: AsyncTaskException: Asynchronous task failed with real account
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 1a5463a8d31 CAMEL-18447 Camel-pubsub: AsyncTaskException: Asynchronous task failed with real account
1a5463a8d31 is described below
commit 1a5463a8d31fa95f908be0a97a6ceab462fb488e
Author: JiriOndrusek <on...@gmail.com>
AuthorDate: Thu Sep 1 15:29:03 2022 +0200
CAMEL-18447 Camel-pubsub: AsyncTaskException: Asynchronous task failed with real account
---
.../camel/component/google/pubsub/GooglePubsubConsumer.java | 6 +++++-
.../component/google/pubsub/consumer/AcknowledgeSync.java | 10 ++++++----
2 files changed, 11 insertions(+), 5 deletions(-)
diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index e639e400e22..ef3dc08a443 100644
--- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -192,8 +192,12 @@ public class GooglePubsubConsumer extends DefaultConsumer {
}
if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
+ //existing subscriber can not be propagated, because it will be closed at the end of this block
+ //subscriber will be created at the moment of use
+ // (see https://issues.apache.org/jira/browse/CAMEL-18447)
exchange.adapt(ExtendedExchange.class)
- .addOnCompletion(new AcknowledgeSync(subscriber, subscriptionName));
+ .addOnCompletion(new AcknowledgeSync(
+ () -> endpoint.getComponent().getSubscriberStub(endpoint), subscriptionName));
}
try {
diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
index df4498607d8..0c2fa0528f3 100644
--- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
+++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.google.pubsub.consumer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.pubsub.v1.AcknowledgeRequest;
@@ -28,11 +29,12 @@ import org.apache.camel.spi.Synchronization;
public class AcknowledgeSync implements Synchronization {
- private final SubscriberStub subscriber;
+ //Supplier cannot be used because of thrown exception (Callback used instead)
+ private final Callable<SubscriberStub> subscriberStubSupplier;
private final String subscriptionName;
- public AcknowledgeSync(SubscriberStub subscriber, String subscriptionName) {
- this.subscriber = subscriber;
+ public AcknowledgeSync(Callable<SubscriberStub> subscriberStubSupplier, String subscriptionName) {
+ this.subscriberStubSupplier = subscriberStubSupplier;
this.subscriptionName = subscriptionName;
}
@@ -41,7 +43,7 @@ public class AcknowledgeSync implements Synchronization {
AcknowledgeRequest ackRequest = AcknowledgeRequest.newBuilder()
.addAllAckIds(getAckIdList(exchange))
.setSubscription(subscriptionName).build();
- try {
+ try (SubscriberStub subscriber = subscriberStubSupplier.call()) {
subscriber.acknowledgeCallable().call(ackRequest);
} catch (Exception e) {
throw new RuntimeCamelException(e);