You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/10/28 00:43:09 UTC

[gobblin] branch master updated: [GOBBLIN-1568] Exponential backoff for Salesforce bulk api polling (#3420)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fe8f97  [GOBBLIN-1568] Exponential backoff for Salesforce bulk api polling (#3420)
7fe8f97 is described below

commit 7fe8f975c050556e7f049474d7abc30d8f0a498b
Author: Jiashuo Wang <wi...@umich.edu>
AuthorDate: Wed Oct 27 17:43:01 2021 -0700

    [GOBBLIN-1568] Exponential backoff for Salesforce bulk api polling (#3420)
    
    * Exponential backoff for Salesforce bulk api polling
    
    * Read min and max wait time from prop with default
---
 .../apache/gobblin/configuration/ConfigurationKeys.java    |  6 ++++++
 .../org/apache/gobblin/salesforce/SalesforceExtractor.java | 14 +++++++++++---
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c55b84c..07182a8 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -318,6 +318,12 @@ public class ConfigurationKeys {
   public static final boolean DEFAULT_EXTRACT_LIMIT_ENABLED = false;
   public static final String EXTRACT_ID_TIME_ZONE = "extract.extractIdTimeZone";
   public static final String DEFAULT_EXTRACT_ID_TIME_ZONE = "UTC";
+  public static final String EXTRACT_SALESFORCE_BULK_API_MIN_WAIT_TIME_IN_MILLIS_KEY =
+      "extract.salesforce.bulkApi.minWaitTimeInMillis";
+  public static final long DEFAULT_EXTRACT_SALESFORCE_BULK_API_MIN_WAIT_TIME_IN_MILLIS = 60 * 1000L; // 1 min
+  public static final String EXTRACT_SALESFORCE_BULK_API_MAX_WAIT_TIME_IN_MILLIS_KEY =
+      "extract.salesforce.bulkApi.maxWaitTimeInMillis";
+  public static final long DEFAULT_EXTRACT_SALESFORCE_BULK_API_MAX_WAIT_TIME_IN_MILLIS = 10 * 60 * 1000L; // 10 min
 
   /**
    * Converter configuration properties.
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 2021ef8..f91ee01 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -835,17 +835,25 @@ public class SalesforceExtractor extends RestApiExtractor {
 
       BatchInfo bulkBatchInfo = this.bulkConnection.createBatchFromStream(this.bulkJob, bout);
 
-      int waitMilliSeconds = 60 * 1000; // wait 1 minute
-
       // Get batch info with complete resultset (info id - refers to the resultset id corresponding to entire resultset)
       bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bulkBatchInfo.getId());
 
       // wait for completion, failure, or formation of PK chunking batches
       // if it is InProgress or Queued, continue to wait.
+      int count = 0;
+      long minWaitTimeInMilliSeconds = super.workUnitState.getPropAsLong(
+          ConfigurationKeys.EXTRACT_SALESFORCE_BULK_API_MIN_WAIT_TIME_IN_MILLIS_KEY,
+          ConfigurationKeys.DEFAULT_EXTRACT_SALESFORCE_BULK_API_MIN_WAIT_TIME_IN_MILLIS);
+      long maxWaitTimeInMilliSeconds = super.workUnitState.getPropAsLong(
+          ConfigurationKeys.EXTRACT_SALESFORCE_BULK_API_MAX_WAIT_TIME_IN_MILLIS_KEY,
+          ConfigurationKeys.DEFAULT_EXTRACT_SALESFORCE_BULK_API_MAX_WAIT_TIME_IN_MILLIS);
       while (bulkBatchInfo.getState() == BatchStateEnum.InProgress || bulkBatchInfo.getState() == BatchStateEnum.Queued) {
+        log.info("Waiting for bulk resultSetIds");
+        // Exponential backoff
+        long waitMilliSeconds = Math.min((long) (Math.pow(2, count) * minWaitTimeInMilliSeconds), maxWaitTimeInMilliSeconds);
         Thread.sleep(waitMilliSeconds);
         bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bulkBatchInfo.getId());
-        log.info("Waiting for bulk resultSetIds");
+        count++;
       }
 
       // Wait for pk chunking batches