You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/11/13 08:41:48 UTC

[camel] branch main updated: CAMEL-17185 Add an option for customizing retryable PubSub server errors (#6413)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e5e24c  CAMEL-17185 Add an option for customizing retryable PubSub server errors (#6413)
0e5e24c is described below

commit 0e5e24c9454057b9d1947db265690802b06d1e6e
Author: vpaturet <46...@users.noreply.github.com>
AuthorDate: Sat Nov 13 09:41:12 2021 +0100

    CAMEL-17185 Add an option for customizing retryable PubSub server errors (#6413)
    
    * Add an option for customizing retryable PubSub server errors
    
    * Expose the option for customizing retryable code as a string
---
 .../pubsub/GooglePubsubComponentConfigurer.java    |  6 +++++
 .../component/google/pubsub/google-pubsub.json     |  1 +
 .../google/pubsub/GooglePubsubComponent.java       | 29 ++++++++++++++++++++++
 .../dsl/GooglePubsubComponentBuilderFactory.java   | 18 ++++++++++++++
 4 files changed, 54 insertions(+)

diff --git a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubComponentConfigurer.java b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubComponentConfigurer.java
index e2b37f5..6543efb 100644
--- a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubComponentConfigurer.java
+++ b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubComponentConfigurer.java
@@ -37,6 +37,8 @@ public class GooglePubsubComponentConfigurer extends PropertyConfigurerSupport i
         case "publisherTerminationTimeout": target.setPublisherTerminationTimeout(property(camelContext, int.class, value)); return true;
         case "serviceaccountkey":
         case "serviceAccountKey": target.setServiceAccountKey(property(camelContext, java.lang.String.class, value)); return true;
+        case "synchronouspullretryablecodes":
+        case "synchronousPullRetryableCodes": target.setSynchronousPullRetryableCodes(property(camelContext, java.lang.String.class, value)); return true;
         default: return false;
         }
     }
@@ -60,6 +62,8 @@ public class GooglePubsubComponentConfigurer extends PropertyConfigurerSupport i
         case "publisherTerminationTimeout": return int.class;
         case "serviceaccountkey":
         case "serviceAccountKey": return java.lang.String.class;
+        case "synchronouspullretryablecodes":
+        case "synchronousPullRetryableCodes": return java.lang.String.class;
         default: return null;
         }
     }
@@ -84,6 +88,8 @@ public class GooglePubsubComponentConfigurer extends PropertyConfigurerSupport i
         case "publisherTerminationTimeout": return target.getPublisherTerminationTimeout();
         case "serviceaccountkey":
         case "serviceAccountKey": return target.getServiceAccountKey();
+        case "synchronouspullretryablecodes":
+        case "synchronousPullRetryableCodes": return target.getSynchronousPullRetryableCodes();
         default: return null;
         }
     }
diff --git a/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json b/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
index 12d1223..aaba5a0 100644
--- a/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
+++ b/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
@@ -26,6 +26,7 @@
     "endpoint": { "kind": "property", "displayName": "Endpoint", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Endpoint to use with local Pub\/Sub emulator." },
     "serviceAccountKey": { "kind": "property", "displayName": "Service Account Key", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The Service account key that can be used as credentials for the PubSub publisher\/subscriber. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resource from different  [...]
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...]
+    "synchronousPullRetryableCodes": { "kind": "property", "displayName": "Synchronous Pull Retryable Codes", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Comma-separated list of additional retryable error codes for synchronous pull. By default the PubSub client library retries ABORTED, UNAVAILABLE, UNKNOWN" },
     "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during star [...]
     "publisherCacheSize": { "kind": "property", "displayName": "Publisher Cache Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "Maximum number of producers to cache. This could be increased if you have producers for lots of different topics." },
     "publisherCacheTimeout": { "kind": "property", "displayName": "Publisher Cache Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "How many milliseconds should each producer stay alive in the cache." },
diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
index 1f9b69f..7a49337 100644
--- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
+++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
@@ -17,15 +17,20 @@
 package org.apache.camel.component.google.pubsub;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import com.google.api.gax.core.CredentialsProvider;
 import com.google.api.gax.core.FixedCredentialsProvider;
 import com.google.api.gax.core.NoCredentialsProvider;
 import com.google.api.gax.grpc.GrpcTransportChannel;
 import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.StatusCode;
 import com.google.api.gax.rpc.TransportChannelProvider;
 import com.google.auth.oauth2.GoogleCredentials;
 import com.google.auth.oauth2.ServiceAccountCredentials;
@@ -87,6 +92,11 @@ public class GooglePubsubComponent extends DefaultComponent {
               description = "How many milliseconds should a producer be allowed to terminate.")
     private int publisherTerminationTimeout = 60000;
 
+    @Metadata(
+              label = "consumer",
+              description = "Comma-separated list of additional retryable error codes for synchronous pull. By default the PubSub client library retries ABORTED, UNAVAILABLE, UNKNOWN")
+    private String synchronousPullRetryableCodes;
+
     private RemovalListener<String, Publisher> removalListener = removal -> {
         Publisher publisher = removal.getValue();
         if (publisher == null) {
@@ -183,6 +193,17 @@ public class GooglePubsubComponent extends DefaultComponent {
         SubscriberStubSettings.Builder builder = SubscriberStubSettings.newBuilder().setTransportChannelProvider(
                 SubscriberStubSettings.defaultGrpcTransportProviderBuilder().build());
 
+        if (synchronousPullRetryableCodes != null) {
+            // retrieve the default retryable codes and add the ones specified as a component option
+            Set<StatusCode.Code> retryableCodes = new HashSet<>(builder.pullSettings().getRetryableCodes());
+            Set<StatusCode.Code> customRetryableCodes = Stream.of(synchronousPullRetryableCodes.split(","))
+                    .map(String::trim)
+                    .map(StatusCode.Code::valueOf)
+                    .collect(Collectors.toSet());
+            retryableCodes.addAll(customRetryableCodes);
+            builder.pullSettings().setRetryableCodes(retryableCodes);
+        }
+
         if (StringHelper.trimToNull(endpoint) != null) {
             ManagedChannel channel = ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
             TransportChannelProvider channelProvider
@@ -254,4 +275,12 @@ public class GooglePubsubComponent extends DefaultComponent {
     public void setServiceAccountKey(String serviceAccountKey) {
         this.serviceAccountKey = serviceAccountKey;
     }
+
+    public String getSynchronousPullRetryableCodes() {
+        return synchronousPullRetryableCodes;
+    }
+
+    public void setSynchronousPullRetryableCodes(String synchronousPullRetryableCodes) {
+        this.synchronousPullRetryableCodes = synchronousPullRetryableCodes;
+    }
 }
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/GooglePubsubComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/GooglePubsubComponentBuilderFactory.java
index 9f3b79b..c3daf2e 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/GooglePubsubComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/GooglePubsubComponentBuilderFactory.java
@@ -121,6 +121,23 @@ public interface GooglePubsubComponentBuilderFactory {
             return this;
         }
         /**
+         * Comma-separated list of additional retryable error codes for
+         * synchronous pull. By default the PubSub client library retries
+         * ABORTED, UNAVAILABLE, UNKNOWN.
+         * 
+         * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
+         * 
+         * Group: consumer
+         * 
+         * @param synchronousPullRetryableCodes the value to set
+         * @return the dsl builder
+         */
+        default GooglePubsubComponentBuilder synchronousPullRetryableCodes(
+                java.lang.String synchronousPullRetryableCodes) {
+            doSetProperty("synchronousPullRetryableCodes", synchronousPullRetryableCodes);
+            return this;
+        }
+        /**
          * Whether the producer should be started lazy (on the first message).
          * By starting lazy you can use this to allow CamelContext and routes to
          * startup in situations where a producer may otherwise fail during
@@ -232,6 +249,7 @@ public interface GooglePubsubComponentBuilderFactory {
             case "endpoint": ((GooglePubsubComponent) component).setEndpoint((java.lang.String) value); return true;
             case "serviceAccountKey": ((GooglePubsubComponent) component).setServiceAccountKey((java.lang.String) value); return true;
             case "bridgeErrorHandler": ((GooglePubsubComponent) component).setBridgeErrorHandler((boolean) value); return true;
+            case "synchronousPullRetryableCodes": ((GooglePubsubComponent) component).setSynchronousPullRetryableCodes((java.lang.String) value); return true;
             case "lazyStartProducer": ((GooglePubsubComponent) component).setLazyStartProducer((boolean) value); return true;
             case "publisherCacheSize": ((GooglePubsubComponent) component).setPublisherCacheSize((int) value); return true;
             case "publisherCacheTimeout": ((GooglePubsubComponent) component).setPublisherCacheTimeout((int) value); return true;