You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2020/04/07 15:12:52 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1101] DSS-25241): Enhance bulk api retry for ExceedQuota

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5722a85  [GOBBLIN-1101] DSS-25241): Enhance bulk api retry for ExceedQuota
5722a85 is described below

commit 5722a85f29b173f16bf3005d92e89c9951e0d771
Author: Alex Li <al...@linkedin.com>
AuthorDate: Tue Apr 7 08:12:45 2020 -0700

    [GOBBLIN-1101] DSS-25241): Enhance bulk api retry for ExceedQuota
    
    GOBBLIN-1101(DSS-25241): Enhance bulk api retry
    for ExceedQuota
    
    throw out runtime exception for
    InteruptedException
    
    fix typo
    
    restore DEFAULT_FETCH_RETRY_LIMIT
    
    fix find root cause exception
    
    trigger build again
    
    add key for sleep duration
    
    trigger again
    
    fix format
    
    Closes #2942 from arekusuri/GOBBLIN-1101
---
 .../gobblin/salesforce/BulkResultIterator.java     | 93 ++++++++++++++++++----
 .../gobblin/salesforce/ResultChainingIterator.java |  6 +-
 .../salesforce/SalesforceConfigurationKeys.java    |  7 ++
 .../gobblin/salesforce/SalesforceExtractor.java    | 11 ++-
 4 files changed, 98 insertions(+), 19 deletions(-)

diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
index 4178193..0f0aac3 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
@@ -18,6 +18,8 @@
 package org.apache.gobblin.salesforce;
 
 import com.google.gson.JsonElement;
+import com.sforce.async.AsyncApiException;
+import com.sforce.async.AsyncExceptionCode;
 import com.sforce.async.BulkConnection;
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -30,6 +32,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
 import org.apache.gobblin.source.extractor.utils.Utils;
 
+
 /**
  * Iterator for fetching result file of Bulk API.
  */
@@ -42,10 +45,14 @@ public class BulkResultIterator implements Iterator<JsonElement> {
   private List<String> header;
   private int columnSize;
   private int lineCount = 0; // this is different than currentFileRowCount. cvs file has header
+  private long retryInterval;
+  private long retryExceedQuotaInterval;
   private List<String> preLoadedLine = null;
 
-  public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int retryLimit) {
+  public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int retryLimit, long retryInterval, long retryExceedQuotaInterval) {
     log.info("create BulkResultIterator: " + fileIdVO);
+    this.retryInterval = retryInterval;
+    this.retryExceedQuotaInterval = retryExceedQuotaInterval;
     this.conn = conn;
     this.fileIdVO = fileIdVO;
     this.retryLimit = retryLimit;
@@ -62,24 +69,52 @@ public class BulkResultIterator implements Iterator<JsonElement> {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuota()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(retryExceedQuotaInterval);
+          executeCount--; // if the current exception is Quota Exceeded, keep trying forever
+        }
+        log.info("***Retrying***1: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = null; // in next loop, call openAndSeekCsvReader
+      } catch (Exception e) {
+        // Retry may resolve other exceptions.
+        rootCause = e;
+        threadSleep(retryInterval);
+        log.info("***Retrying***2: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = null; // in next loop, call openAndSeekCsvReader
       }
     }
-    throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", exception);
+    if (executeCount == 1) {
+      throw new RuntimeException("***Fetch***: Failed", rootCause);
+    } else {
+      throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", rootCause);
+    }
+  }
+
+  private void threadSleep(long millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (Exception e) {
+      log.error("--Failed to sleep--", e);
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
@@ -103,7 +138,10 @@ public class BulkResultIterator implements Iterator<JsonElement> {
     return jsonObject;
   }
 
-  private InputStreamCSVReader openAndSeekCsvReader(Exception exceptionRetryFor) {
+  private InputStreamCSVReader openAndSeekCsvReader(Throwable rootCause) throws OpenAndSeekException {
+    while (rootCause != null && rootCause.getCause() != null) {
+      rootCause = rootCause.getCause(); // find the root cause
+    }
     String jobId = fileIdVO.getJobId();
     String batchId = fileIdVO.getBatchId();
     String resultId = fileIdVO.getResultId();
@@ -117,14 +155,17 @@ public class BulkResultIterator implements Iterator<JsonElement> {
       for (int j = 0; j < lineCount; j++) {
         lastSkippedLine = csvReader.nextRecord(); // skip these records
       }
-      if ((lastSkippedLine == null && preLoadedLine != null) || (lastSkippedLine != null && !lastSkippedLine.equals(preLoadedLine))) {
+      if ((lastSkippedLine == null && preLoadedLine != null) || (lastSkippedLine != null && !lastSkippedLine.equals(
+          preLoadedLine))) {
         // check if last skipped line is same as the line before error
-        throw new RuntimeException("Failed to verify last skipped line - retrying for =>", exceptionRetryFor);
+        String msg = rootCause == null ? "null" : rootCause.getMessage();
+        throw new OpenAndSeekException("Failed to verify last skipped line - root cause [" + msg + "]", rootCause);
       }
       return csvReader;
-    } catch (Exception e) { // failed to open reader and skip lineCount lines
-      exceptionRetryFor = exceptionRetryFor != null? exceptionRetryFor : e;
-      throw new RuntimeException("Failed to [" + e.getMessage() + "] - retrying for => " , exceptionRetryFor);
+
+    } catch (Exception currentException) { // failed to open reader and skip lineCount lines // ssl failures go here
+      Throwable cause = rootCause == null ? currentException : rootCause;
+      throw new OpenAndSeekException("Failed to [" + cause.getMessage() + "]" , cause, currentException);
     }
   }
 
@@ -144,3 +185,25 @@ public class BulkResultIterator implements Iterator<JsonElement> {
     }
   }
 }
+
+
+class OpenAndSeekException extends Exception {
+  private boolean _isCurrentExceptionExceedQuota;
+  public OpenAndSeekException(String msg, Throwable rootCause) {
+    super(msg, rootCause);
+    if (rootCause instanceof AsyncApiException &&
+        ((AsyncApiException) rootCause).getExceptionCode() == AsyncExceptionCode.ExceededQuota) {
+      _isCurrentExceptionExceedQuota = true;
+    }
+  }
+  public OpenAndSeekException(String msg, Throwable rootCause, Exception currentException) {
+    super(msg, rootCause);
+    if (currentException instanceof AsyncApiException &&
+        ((AsyncApiException) currentException).getExceptionCode() == AsyncExceptionCode.ExceededQuota) {
+      _isCurrentExceptionExceedQuota = true;
+    }
+  }
+  public boolean isCurrentExceptionExceedQuota() {
+    return _isCurrentExceptionExceedQuota;
+  }
+}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java
index fa17344..bab77e5 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java
@@ -35,8 +35,10 @@ public class ResultChainingIterator implements Iterator<JsonElement> {
   private Iterator<JsonElement> iter;
   private int recordCount = 0;
 
-  public ResultChainingIterator(BulkConnection conn, List<FileIdVO> fileIdList, int retryLimit) {
-    Iterator<BulkResultIterator> iterOfFiles = fileIdList.stream().map(x -> new BulkResultIterator(conn, x, retryLimit)).iterator();
+  public ResultChainingIterator(BulkConnection conn, List<FileIdVO> fileIdList, int retryLimit,
+      long retryInterval, long retryExceedQuotaInterval) {
+    Iterator<BulkResultIterator> iterOfFiles = fileIdList.stream().map(x ->
+        new BulkResultIterator(conn, x, retryLimit, retryInterval, retryExceedQuotaInterval)).iterator();
     iter = Iterators.<JsonElement>concat(iterOfFiles);
   }
 
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
index 6d40421..b40e0e8 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
@@ -28,6 +28,13 @@ public final class SalesforceConfigurationKeys {
   public static final int DEFAULT_FETCH_RETRY_LIMIT = 5;
   public static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll";
 
+  // bulk api retry sleep duration for avoid resource consuming peak.
+  public static final String RETRY_EXCEED_QUOTA_INTERVAL = "salesforce.retry.exceedQuotaInterval";
+  public static final long RETRY_EXCEED_QUOTA_INTERVAL_DEFAULT = 5 * 60 * 1000;
+
+  public static final String RETRY_INTERVAL = "salesforce.retry.interval";
+  public static final long RETRY_INTERVAL_DEFAULT = 1 * 60 * 1000;
+
   // pk-chunking
   public static final String BULK_TEST_JOB_ID = "salesforce.bulk.testJobId";
   public static final String BULK_TEST_BATCH_ID_LIST = "salesforce.bulk.testBatchIds";
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 4582b0b..2a5d8bc 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -114,10 +114,14 @@ public class SalesforceExtractor extends RestApiExtractor {
 
   private final int pkChunkingSize;
   private final SalesforceConnector sfConnector;
+
   private final int retryLimit;
+  private final long retryInterval;
+  private final long retryExceedQuotaInterval;
 
   private final boolean bulkApiUseQueryAll;
 
+
   public SalesforceExtractor(WorkUnitState state) {
     super(state);
 
@@ -128,6 +132,8 @@ public class SalesforceExtractor extends RestApiExtractor {
 
     this.bulkApiUseQueryAll = workUnitState.getPropAsBoolean(BULK_API_USE_QUERY_ALL, DEFAULT_BULK_API_USE_QUERY_ALL);
     this.retryLimit = workUnitState.getPropAsInt(FETCH_RETRY_LIMIT_KEY, DEFAULT_FETCH_RETRY_LIMIT);
+    this.retryInterval = workUnitState.getPropAsLong(RETRY_INTERVAL, RETRY_INTERVAL_DEFAULT);
+    this.retryExceedQuotaInterval = workUnitState.getPropAsLong(RETRY_EXCEED_QUOTA_INTERVAL, RETRY_EXCEED_QUOTA_INTERVAL_DEFAULT);
   }
 
   @Override
@@ -554,7 +560,7 @@ public class SalesforceExtractor extends RestApiExtractor {
     String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
     String batchIdResultIdPairString = workUnit.getProp(PK_CHUNKING_BATCH_RESULT_ID_PAIRS);
     List<FileIdVO> fileIdList = this.parseBatchIdResultIdString(jobId, batchIdResultIdPairString);
-    return new ResultChainingIterator(bulkConnection, fileIdList, retryLimit);
+    return new ResultChainingIterator(bulkConnection, fileIdList, retryLimit, retryInterval, retryExceedQuotaInterval);
   }
 
   private List<FileIdVO> parseBatchIdResultIdString(String jobId, String batchIdResultIdString) {
@@ -584,7 +590,8 @@ public class SalesforceExtractor extends RestApiExtractor {
       List<FileIdVO> fileIdVoList = this.bulkResultIdList.stream()
           .map(x -> new FileIdVO(this.bulkJob.getId(), x.batchId, x.resultId))
           .collect(Collectors.toList());
-      ResultChainingIterator chainingIter = new ResultChainingIterator(this.bulkConnection, fileIdVoList, this.retryLimit);
+      ResultChainingIterator chainingIter = new ResultChainingIterator(
+          bulkConnection, fileIdVoList, retryLimit, retryInterval, retryExceedQuotaInterval);
       chainingIter.add(getSoftDeletedRecords(schema, entity, workUnit, predicateList));
       return chainingIter;
     } catch (Exception e) {