You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by je...@apache.org on 2021/11/16 17:26:59 UTC

[camel] branch main updated: CAMEL-17195: camel-salesforce: Recover from invalid replayId

This is an automated email from the ASF dual-hosted git repository.

jeremyross pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 8050b827 CAMEL-17195: camel-salesforce: Recover from invalid replayId
8050b827 is described below

commit 8050b82724b10b4de8cb22e16e4a513bf66f1eb9
Author: Jeremy Ross <je...@gmail.com>
AuthorDate: Tue Nov 16 11:22:09 2021 -0600

    CAMEL-17195: camel-salesforce: Recover from invalid replayId
    
    Implement fallBackReplayId to use in case of an invalid replay Id.
---
 .../salesforce/SalesforceComponentConfigurer.java  |  6 ++
 .../salesforce/SalesforceEndpointConfigurer.java   |  6 ++
 .../salesforce/SalesforceEndpointUriFactory.java   |  3 +-
 .../camel/component/salesforce/salesforce.json     |  2 +
 .../salesforce/SalesforceEndpointConfig.java       | 18 ++++
 .../internal/streaming/SubscriptionHelper.java     | 19 ++++-
 .../salesforce/StreamingApiIntegrationTest.java    | 20 ++++-
 .../dsl/SalesforceComponentBuilderFactory.java     | 17 ++++
 .../dsl/SalesforceEndpointBuilderFactory.java      | 98 ++++++++++++++++++++++
 9 files changed, 183 insertions(+), 6 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceComponentConfigurer.java b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceComponentConfigurer.java
index 9144c49..1018e7c 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceComponentConfigurer.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceComponentConfigurer.java
@@ -59,6 +59,8 @@ public class SalesforceComponentConfigurer extends PropertyConfigurerSupport imp
         case "contentType": getOrCreateConfig(target).setContentType(property(camelContext, org.apache.camel.component.salesforce.api.dto.bulk.ContentType.class, value)); return true;
         case "defaultreplayid":
         case "defaultReplayId": getOrCreateConfig(target).setDefaultReplayId(property(camelContext, java.lang.Long.class, value)); return true;
+        case "fallbackreplayid":
+        case "fallBackReplayId": getOrCreateConfig(target).setFallBackReplayId(property(camelContext, java.lang.Long.class, value)); return true;
         case "format": getOrCreateConfig(target).setFormat(property(camelContext, org.apache.camel.component.salesforce.internal.PayloadFormat.class, value)); return true;
         case "httpclient":
         case "httpClient": getOrCreateConfig(target).setHttpClient(property(camelContext, org.apache.camel.component.salesforce.SalesforceHttpClient.class, value)); return true;
@@ -237,6 +239,8 @@ public class SalesforceComponentConfigurer extends PropertyConfigurerSupport imp
         case "contentType": return org.apache.camel.component.salesforce.api.dto.bulk.ContentType.class;
         case "defaultreplayid":
         case "defaultReplayId": return java.lang.Long.class;
+        case "fallbackreplayid":
+        case "fallBackReplayId": return java.lang.Long.class;
         case "format": return org.apache.camel.component.salesforce.internal.PayloadFormat.class;
         case "httpclient":
         case "httpClient": return org.apache.camel.component.salesforce.SalesforceHttpClient.class;
@@ -416,6 +420,8 @@ public class SalesforceComponentConfigurer extends PropertyConfigurerSupport imp
         case "contentType": return getOrCreateConfig(target).getContentType();
         case "defaultreplayid":
         case "defaultReplayId": return getOrCreateConfig(target).getDefaultReplayId();
+        case "fallbackreplayid":
+        case "fallBackReplayId": return getOrCreateConfig(target).getFallBackReplayId();
         case "format": return getOrCreateConfig(target).getFormat();
         case "httpclient":
         case "httpClient": return getOrCreateConfig(target).getHttpClient();
diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
index 47c35d9..836f542 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
@@ -47,6 +47,8 @@ public class SalesforceEndpointConfigurer extends PropertyConfigurerSupport impl
         case "exceptionHandler": target.setExceptionHandler(property(camelContext, org.apache.camel.spi.ExceptionHandler.class, value)); return true;
         case "exchangepattern":
         case "exchangePattern": target.setExchangePattern(property(camelContext, org.apache.camel.ExchangePattern.class, value)); return true;
+        case "fallbackreplayid":
+        case "fallBackReplayId": target.getConfiguration().setFallBackReplayId(property(camelContext, java.lang.Long.class, value)); return true;
         case "format": target.getConfiguration().setFormat(property(camelContext, org.apache.camel.component.salesforce.internal.PayloadFormat.class, value)); return true;
         case "httpclient":
         case "httpClient": target.getConfiguration().setHttpClient(property(camelContext, org.apache.camel.component.salesforce.SalesforceHttpClient.class, value)); return true;
@@ -163,6 +165,8 @@ public class SalesforceEndpointConfigurer extends PropertyConfigurerSupport impl
         case "exceptionHandler": return org.apache.camel.spi.ExceptionHandler.class;
         case "exchangepattern":
         case "exchangePattern": return org.apache.camel.ExchangePattern.class;
+        case "fallbackreplayid":
+        case "fallBackReplayId": return java.lang.Long.class;
         case "format": return org.apache.camel.component.salesforce.internal.PayloadFormat.class;
         case "httpclient":
         case "httpClient": return org.apache.camel.component.salesforce.SalesforceHttpClient.class;
@@ -280,6 +284,8 @@ public class SalesforceEndpointConfigurer extends PropertyConfigurerSupport impl
         case "exceptionHandler": return target.getExceptionHandler();
         case "exchangepattern":
         case "exchangePattern": return target.getExchangePattern();
+        case "fallbackreplayid":
+        case "fallBackReplayId": return target.getConfiguration().getFallBackReplayId();
         case "format": return target.getConfiguration().getFormat();
         case "httpclient":
         case "httpClient": return target.getConfiguration().getHttpClient();
diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
index 1485f86..53e02a44 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class SalesforceEndpointUriFactory extends org.apache.camel.support.compo
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(58);
+        Set<String> props = new HashSet<>(59);
         props.add("initialReplayIdMap");
         props.add("notifyForOperations");
         props.add("sObjectQuery");
@@ -53,6 +53,7 @@ public class SalesforceEndpointUriFactory extends org.apache.camel.support.compo
         props.add("exceptionHandler");
         props.add("maxRecords");
         props.add("pkChunkingParent");
+        props.add("fallBackReplayId");
         props.add("batchId");
         props.add("notifyForOperationUndelete");
         props.add("apexUrl");
diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json
index 6db43a4..ae5a0ac 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json
+++ b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json
@@ -30,6 +30,7 @@
     "batchId": { "kind": "property", "displayName": "Batch Id", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "config", "description": "Bulk API Batch ID" },
     "contentType": { "kind": "property", "displayName": "Content Type", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.salesforce.api.dto.bulk.ContentType", "enum": [ "XML", "CSV", "JSON", "ZIP_XML", "ZIP_CSV", "ZIP_JSON" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "config", "description": "Bulk API co [...]
     "defaultReplayId": { "kind": "property", "displayName": "Default Replay Id", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "-1", "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "config", "description": "Default replayId setting if no value is found in initialReplayIdMap" },
+    "fallBackReplayId": { "kind": "property", "displayName": "Fall Back Replay Id", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "-1", "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "config", "description": "ReplayId to fall back to after an Invalid Replay Id response" },
     "format": { "kind": "property", "displayName": "Format", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.salesforce.internal.PayloadFormat", "enum": [ "JSON", "XML" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "config", "description": "Payload format to use for Salesforce API calls, either JSON or XM [...]
     "httpClient": { "kind": "property", "displayName": "Http Client", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.salesforce.SalesforceHttpClient", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "config", "description": "Custom Jetty Http Client to use to connect to Salesforce." },
     "httpClientConnectionTimeout": { "kind": "property", "displayName": "Http Client Connection Timeout", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 60000, "description": "Connection timeout used by the HttpClient when connecting to the Salesforce server." },
@@ -123,6 +124,7 @@
     "batchId": { "kind": "parameter", "displayName": "Batch Id", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Bulk API Batch ID" },
     "contentType": { "kind": "parameter", "displayName": "Content Type", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.salesforce.api.dto.bulk.ContentType", "enum": [ "XML", "CSV", "JSON", "ZIP_XML", "ZIP_CSV", "ZIP_JSON" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Bul [...]
     "defaultReplayId": { "kind": "parameter", "displayName": "Default Replay Id", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "-1", "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Default replayId setting if no value is found in initialReplayIdMap" },
+    "fallBackReplayId": { "kind": "parameter", "displayName": "Fall Back Replay Id", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "-1", "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "ReplayId to fall back to after an Invalid Replay Id response" },
     "format": { "kind": "parameter", "displayName": "Format", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.salesforce.internal.PayloadFormat", "enum": [ "JSON", "XML" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Payload format to use for Salesforce API calls, either JS [...]
     "httpClient": { "kind": "parameter", "displayName": "Http Client", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.salesforce.SalesforceHttpClient", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Custom Jetty Http Client to use to connect to Salesforce." },
     "includeDetails": { "kind": "parameter", "displayName": "Include Details", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Include details in Salesforce1 Analytics report, defaults to false." },
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
index 57d63d8..a27bd98 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
@@ -90,6 +90,7 @@ public class SalesforceEndpointConfig implements Cloneable {
 
     // parameters for Streaming API
     public static final String DEFAULT_REPLAY_ID = "defaultReplayId";
+    public static final String FALL_BACK_REPLAY_ID = "fallBackReplayId";
     public static final String INITIAL_REPLAY_ID_MAP = "initialReplayIdMap";
     public static final long REPLAY_FROM_TIP = -1L;
 
@@ -205,6 +206,11 @@ public class SalesforceEndpointConfig implements Cloneable {
     @UriParam(description = "Default replayId setting if no value is found in initialReplayIdMap",
               defaultValue = "" + REPLAY_FROM_TIP)
     private Long defaultReplayId = REPLAY_FROM_TIP;
+
+    @UriParam(description = "ReplayId to fall back to after an Invalid Replay Id response",
+              defaultValue = "" + REPLAY_FROM_TIP)
+    private Long fallBackReplayId = REPLAY_FROM_TIP;
+
     @UriParam
     private Map<String, Long> initialReplayIdMap;
 
@@ -789,6 +795,7 @@ public class SalesforceEndpointConfig implements Cloneable {
 
         // add streaming API properties
         valueMap.put(DEFAULT_REPLAY_ID, defaultReplayId);
+        valueMap.put(FALL_BACK_REPLAY_ID, fallBackReplayId);
         valueMap.put(INITIAL_REPLAY_ID_MAP, initialReplayIdMap);
 
         valueMap.put(NOT_FOUND_BEHAVIOUR, notFoundBehaviour);
@@ -825,6 +832,17 @@ public class SalesforceEndpointConfig implements Cloneable {
         this.initialReplayIdMap = initialReplayIdMap;
     }
 
+    public Long getFallBackReplayId() {
+        return fallBackReplayId;
+    }
+
+    /**
+     * ReplayId to fall back to after an Invalid Replay Id response
+     */
+    public void setFallBackReplayId(Long fallBackReplayId) {
+        this.fallBackReplayId = fallBackReplayId;
+    }
+
     public Integer getLimit() {
         return limit;
     }
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index c3831ba..694a2de 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -69,6 +69,7 @@ public class SubscriptionHelper extends ServiceSupport {
     private static final int DISCONNECT_INTERVAL = 5000;
     private static final String SERVER_TOO_BUSY_ERROR = "503::";
     private static final String AUTHENTICATION_INVALID = "401::Authentication invalid";
+    private static final String INVALID_REPLAY_ID_PATTERN = "400::The replayId \\{.*} you provided was invalid.*";
 
     BayeuxClient client;
 
@@ -319,7 +320,7 @@ public class SubscriptionHelper extends ServiceSupport {
         if (message.get(EXCEPTION_FIELD) != null) {
             exception = (Exception) message.get(EXCEPTION_FIELD);
         } else if (message.get(FAILURE_FIELD) != null) {
-            exception = (Exception) ((Map<String, Object>) message.get("failure")).get("exception");
+            exception = (Exception) ((Map<String, Object>) message.get(FAILURE_FIELD)).get("exception");
         } else {
             String failureReason = getFailureReason(message);
             if (failureReason != null) {
@@ -414,10 +415,16 @@ public class SubscriptionHelper extends ServiceSupport {
     }
 
     public void subscribe(final String topicName, final SalesforceConsumer consumer) {
+        subscribe(topicName, consumer, false);
+    }
+
+    public void subscribe(
+            final String topicName, final SalesforceConsumer consumer,
+            final boolean skipReplayId) {
         // create subscription for consumer
         final String channelName = getChannelName(topicName);
 
-        if (!reconnecting) {
+        if (!reconnecting && !skipReplayId) {
             setupReplay((SalesforceEndpoint) consumer.getEndpoint());
         }
 
@@ -470,6 +477,14 @@ public class SubscriptionHelper extends ServiceSupport {
                                     LOG.warn("Aborting subscribe on interrupt!", e);
                                 }
                             }
+                        } else if (error.matches(INVALID_REPLAY_ID_PATTERN)) {
+                            abort = false;
+                            final Long fallBackReplayId
+                                    = ((SalesforceEndpoint) consumer.getEndpoint()).getConfiguration().getFallBackReplayId();
+                            LOG.warn(error);
+                            LOG.warn("Falling back to replayId {} for channel {}", fallBackReplayId, channelName);
+                            REPLAY_EXTENSION.addChannelReplayId(channelName, fallBackReplayId);
+                            subscribe(topicName, consumer, true);
                         }
 
                         if (abort && client != null) {
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java
index 6cf6210..30de829 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java
@@ -48,6 +48,11 @@ public class StreamingApiIntegrationTest extends AbstractSalesforceTestBase {
         rawPayloadMock.expectedHeaderReceived("CamelSalesforceTopicName", "CamelTestTopic");
         rawPayloadMock.expectedHeaderReceived("CamelSalesforceChannel", "/topic/CamelTestTopic");
 
+        MockEndpoint fallbackMock = getMockEndpoint("mock:CamelFallbackTestTopic");
+        fallbackMock.expectedMessageCount(1);
+        fallbackMock.expectedHeaderReceived("CamelSalesforceTopicName", "CamelFallbackTestTopic");
+        fallbackMock.expectedHeaderReceived("CamelSalesforceChannel", "/topic/CamelFallbackTestTopic");
+
         Merchandise__c merchandise = new Merchandise__c();
         merchandise.setName("TestNotification");
         merchandise.setDescription__c("Merchandise for testing Streaming API updated on " +
@@ -78,6 +83,9 @@ public class StreamingApiIntegrationTest extends AbstractSalesforceTestBase {
             final Message inRaw = rawPayloadMock.getExchanges().get(0).getIn();
             assertTrue(inRaw.getBody() instanceof String, "Expected String message body for Raw Payload");
 
+            // validate fallback
+            fallbackMock.assertIsSatisfied();
+
         } finally {
             // remove the test record
             template().requestBody("direct:deleteSObjectWithId", merchandise);
@@ -85,8 +93,9 @@ public class StreamingApiIntegrationTest extends AbstractSalesforceTestBase {
             // remove the test topic
             // find it using SOQL first
             QueryRecordsPushTopic records = template().requestBody("direct:query", null, QueryRecordsPushTopic.class);
-            assertEquals(1, records.getTotalSize(), "Test topic not found");
+            assertEquals(2, records.getTotalSize(), "Test topics not found");
             template().requestBody("direct:deleteSObject", records.getRecords().get(0));
+            template().requestBody("direct:deleteSObject", records.getRecords().get(1));
 
         }
     }
@@ -98,7 +107,6 @@ public class StreamingApiIntegrationTest extends AbstractSalesforceTestBase {
             public void configure() throws Exception {
 
                 // test topic subscription
-                // from("salesforce:CamelTestTopic?notifyForFields=ALL&notifyForOperations=ALL&"
                 from("salesforce:CamelTestTopic?notifyForFields=ALL&"
                      + "notifyForOperationCreate=true&notifyForOperationDelete=true&notifyForOperationUpdate=true&"
                      + "sObjectName=Merchandise__c&" + "updateTopic=true&sObjectQuery=SELECT Id, Name FROM Merchandise__c")
@@ -108,12 +116,18 @@ public class StreamingApiIntegrationTest extends AbstractSalesforceTestBase {
                      + "notifyForOperationCreate=true&notifyForOperationDelete=true&notifyForOperationUpdate=true&"
                      + "updateTopic=true&sObjectQuery=SELECT Id, Name FROM Merchandise__c").to("mock:RawPayloadCamelTestTopic");
 
+                from("salesforce:CamelFallbackTestTopic?notifyForFields=ALL&defaultReplayId=9999&"
+                     + "notifyForOperationCreate=true&notifyForOperationDelete=true&notifyForOperationUpdate=true&"
+                     + "sObjectName=Merchandise__c&" + "updateTopic=true&sObjectQuery=SELECT Id, Name FROM Merchandise__c")
+                             .to("mock:CamelFallbackTestTopic");
+
                 // route for creating test record
                 from("direct:upsertSObject").to("salesforce:upsertSObject?sObjectIdName=Name");
 
                 // route for finding test topic
                 from("direct:query")
-                        .to("salesforce:query?sObjectQuery=SELECT Id FROM PushTopic WHERE Name = 'CamelTestTopic'&"
+                        .to("salesforce:query?sObjectQuery=SELECT Id FROM PushTopic " +
+                            "WHERE Name IN ('CamelTestTopic', 'CamelFallbackTestTopic')&"
                             + "sObjectClass=" + QueryRecordsPushTopic.class.getName());
 
                 // route for removing test record
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SalesforceComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SalesforceComponentBuilderFactory.java
index c0993b0..e6e5437 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SalesforceComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SalesforceComponentBuilderFactory.java
@@ -161,6 +161,22 @@ public interface SalesforceComponentBuilderFactory {
             return this;
         }
         /**
+         * ReplayId to fall back to after an Invalid Replay Id response.
+         * 
+         * The option is a: &lt;code&gt;java.lang.Long&lt;/code&gt; type.
+         * 
+         * Default: -1
+         * Group: common
+         * 
+         * @param fallBackReplayId the value to set
+         * @return the dsl builder
+         */
+        default SalesforceComponentBuilder fallBackReplayId(
+                java.lang.Long fallBackReplayId) {
+            doSetProperty("fallBackReplayId", fallBackReplayId);
+            return this;
+        }
+        /**
          * Payload format to use for Salesforce API calls, either JSON or XML,
          * defaults to JSON. As of Camel 3.12, this option only applies to the
          * Raw operation.
@@ -1543,6 +1559,7 @@ public interface SalesforceComponentBuilderFactory {
             case "batchId": getOrCreateConfiguration((SalesforceComponent) component).setBatchId((java.lang.String) value); return true;
             case "contentType": getOrCreateConfiguration((SalesforceComponent) component).setContentType((org.apache.camel.component.salesforce.api.dto.bulk.ContentType) value); return true;
             case "defaultReplayId": getOrCreateConfiguration((SalesforceComponent) component).setDefaultReplayId((java.lang.Long) value); return true;
+            case "fallBackReplayId": getOrCreateConfiguration((SalesforceComponent) component).setFallBackReplayId((java.lang.Long) value); return true;
             case "format": getOrCreateConfiguration((SalesforceComponent) component).setFormat((org.apache.camel.component.salesforce.internal.PayloadFormat) value); return true;
             case "httpClient": getOrCreateConfiguration((SalesforceComponent) component).setHttpClient((org.apache.camel.component.salesforce.SalesforceHttpClient) value); return true;
             case "httpClientConnectionTimeout": ((SalesforceComponent) component).setHttpClientConnectionTimeout((long) value); return true;
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SalesforceEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SalesforceEndpointBuilderFactory.java
index c022555..aff2df0 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SalesforceEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SalesforceEndpointBuilderFactory.java
@@ -217,6 +217,39 @@ public interface SalesforceEndpointBuilderFactory {
             return this;
         }
         /**
+         * ReplayId to fall back to after an Invalid Replay Id response.
+         * 
+         * The option is a: &lt;code&gt;java.lang.Long&lt;/code&gt; type.
+         * 
+         * Default: -1
+         * Group: common
+         * 
+         * @param fallBackReplayId the value to set
+         * @return the dsl builder
+         */
+        default SalesforceEndpointConsumerBuilder fallBackReplayId(
+                Long fallBackReplayId) {
+            doSetProperty("fallBackReplayId", fallBackReplayId);
+            return this;
+        }
+        /**
+         * ReplayId to fall back to after an Invalid Replay Id response.
+         * 
+         * The option will be converted to a
+         * &lt;code&gt;java.lang.Long&lt;/code&gt; type.
+         * 
+         * Default: -1
+         * Group: common
+         * 
+         * @param fallBackReplayId the value to set
+         * @return the dsl builder
+         */
+        default SalesforceEndpointConsumerBuilder fallBackReplayId(
+                String fallBackReplayId) {
+            doSetProperty("fallBackReplayId", fallBackReplayId);
+            return this;
+        }
+        /**
          * Payload format to use for Salesforce API calls, either JSON or XML,
          * defaults to JSON. As of Camel 3.12, this option only applies to the
          * Raw operation.
@@ -1481,6 +1514,39 @@ public interface SalesforceEndpointBuilderFactory {
             return this;
         }
         /**
+         * ReplayId to fall back to after an Invalid Replay Id response.
+         * 
+         * The option is a: &lt;code&gt;java.lang.Long&lt;/code&gt; type.
+         * 
+         * Default: -1
+         * Group: common
+         * 
+         * @param fallBackReplayId the value to set
+         * @return the dsl builder
+         */
+        default SalesforceEndpointProducerBuilder fallBackReplayId(
+                Long fallBackReplayId) {
+            doSetProperty("fallBackReplayId", fallBackReplayId);
+            return this;
+        }
+        /**
+         * ReplayId to fall back to after an Invalid Replay Id response.
+         * 
+         * The option will be converted to a
+         * &lt;code&gt;java.lang.Long&lt;/code&gt; type.
+         * 
+         * Default: -1
+         * Group: common
+         * 
+         * @param fallBackReplayId the value to set
+         * @return the dsl builder
+         */
+        default SalesforceEndpointProducerBuilder fallBackReplayId(
+                String fallBackReplayId) {
+            doSetProperty("fallBackReplayId", fallBackReplayId);
+            return this;
+        }
+        /**
          * Payload format to use for Salesforce API calls, either JSON or XML,
          * defaults to JSON. As of Camel 3.12, this option only applies to the
          * Raw operation.
@@ -2770,6 +2836,38 @@ public interface SalesforceEndpointBuilderFactory {
             return this;
         }
         /**
+         * ReplayId to fall back to after an Invalid Replay Id response.
+         * 
+         * The option is a: &lt;code&gt;java.lang.Long&lt;/code&gt; type.
+         * 
+         * Default: -1
+         * Group: common
+         * 
+         * @param fallBackReplayId the value to set
+         * @return the dsl builder
+         */
+        default SalesforceEndpointBuilder fallBackReplayId(Long fallBackReplayId) {
+            doSetProperty("fallBackReplayId", fallBackReplayId);
+            return this;
+        }
+        /**
+         * ReplayId to fall back to after an Invalid Replay Id response.
+         * 
+         * The option will be converted to a
+         * &lt;code&gt;java.lang.Long&lt;/code&gt; type.
+         * 
+         * Default: -1
+         * Group: common
+         * 
+         * @param fallBackReplayId the value to set
+         * @return the dsl builder
+         */
+        default SalesforceEndpointBuilder fallBackReplayId(
+                String fallBackReplayId) {
+            doSetProperty("fallBackReplayId", fallBackReplayId);
+            return this;
+        }
+        /**
          * Payload format to use for Salesforce API calls, either JSON or XML,
          * defaults to JSON. As of Camel 3.12, this option only applies to the
          * Raw operation.