You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2020/04/07 15:12:52 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1101]
DSS-25241): Enhance bulk api retry for ExceedQuota
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5722a85 [GOBBLIN-1101] DSS-25241): Enhance bulk api retry for ExceedQuota
5722a85 is described below
commit 5722a85f29b173f16bf3005d92e89c9951e0d771
Author: Alex Li <al...@linkedin.com>
AuthorDate: Tue Apr 7 08:12:45 2020 -0700
[GOBBLIN-1101] DSS-25241): Enhance bulk api retry for ExceedQuota
GOBBLIN-1101(DSS-25241): Enhance bulk api retry
for ExceedQuota
throw out runtime exception for
InteruptedException
fix typo
restore DEFAULT_FETCH_RETRY_LIMIT
fix find root cause exception
trigger build again
add key for sleep duration
trigger again
fix format
Closes #2942 from arekusuri/GOBBLIN-1101
---
.../gobblin/salesforce/BulkResultIterator.java | 93 ++++++++++++++++++----
.../gobblin/salesforce/ResultChainingIterator.java | 6 +-
.../salesforce/SalesforceConfigurationKeys.java | 7 ++
.../gobblin/salesforce/SalesforceExtractor.java | 11 ++-
4 files changed, 98 insertions(+), 19 deletions(-)
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
index 4178193..0f0aac3 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
@@ -18,6 +18,8 @@
package org.apache.gobblin.salesforce;
import com.google.gson.JsonElement;
+import com.sforce.async.AsyncApiException;
+import com.sforce.async.AsyncExceptionCode;
import com.sforce.async.BulkConnection;
import java.io.BufferedReader;
import java.io.IOException;
@@ -30,6 +32,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
import org.apache.gobblin.source.extractor.utils.Utils;
+
/**
* Iterator for fetching result file of Bulk API.
*/
@@ -42,10 +45,14 @@ public class BulkResultIterator implements Iterator<JsonElement> {
private List<String> header;
private int columnSize;
private int lineCount = 0; // this is different than currentFileRowCount. cvs file has header
+ private long retryInterval;
+ private long retryExceedQuotaInterval;
private List<String> preLoadedLine = null;
- public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int retryLimit) {
+ public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int retryLimit, long retryInterval, long retryExceedQuotaInterval) {
log.info("create BulkResultIterator: " + fileIdVO);
+ this.retryInterval = retryInterval;
+ this.retryExceedQuotaInterval = retryExceedQuotaInterval;
this.conn = conn;
this.fileIdVO = fileIdVO;
this.retryLimit = retryLimit;
@@ -62,24 +69,52 @@ public class BulkResultIterator implements Iterator<JsonElement> {
}
private List<String> nextLineWithRetry() {
- Exception exception = null;
- for (int i = 0; i < retryLimit; i++) {
+ Throwable rootCause = null;
+ int executeCount = 0;
+ while (executeCount < retryLimit + 1) {
+ executeCount++;
try {
if (this.csvReader == null) {
- this.csvReader = openAndSeekCsvReader(null);
+ this.csvReader = openAndSeekCsvReader(rootCause);
}
List<String> line = this.csvReader.nextRecord();
this.lineCount++;
return line;
} catch (InputStreamCSVReader.CSVParseException e) {
throw new RuntimeException(e); // don't retry if it is parse error
- } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
- exception = e;
- log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
- this.csvReader = openAndSeekCsvReader(e);
+ } catch (OpenAndSeekException e) {
+ rootCause = e.getCause();
+ // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+ // any new synchronous Apex request results in a runtime exception.
+ if (e.isCurrentExceptionExceedQuota()) {
+ log.warn("--Caught ExceededQuota: " + e.getMessage());
+ threadSleep(retryExceedQuotaInterval);
+ executeCount--; // if the current exception is Quota Exceeded, keep trying forever
+ }
+ log.info("***Retrying***1: {} - {}", fileIdVO, e.getMessage());
+ this.csvReader = null; // in next loop, call openAndSeekCsvReader
+ } catch (Exception e) {
+ // Retry may resolve other exceptions.
+ rootCause = e;
+ threadSleep(retryInterval);
+ log.info("***Retrying***2: {} - {}", fileIdVO, e.getMessage());
+ this.csvReader = null; // in next loop, call openAndSeekCsvReader
}
}
- throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", exception);
+ if (executeCount == 1) {
+ throw new RuntimeException("***Fetch***: Failed", rootCause);
+ } else {
+ throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", rootCause);
+ }
+ }
+
+ private void threadSleep(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (Exception e) {
+ log.error("--Failed to sleep--", e);
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -103,7 +138,10 @@ public class BulkResultIterator implements Iterator<JsonElement> {
return jsonObject;
}
- private InputStreamCSVReader openAndSeekCsvReader(Exception exceptionRetryFor) {
+ private InputStreamCSVReader openAndSeekCsvReader(Throwable rootCause) throws OpenAndSeekException {
+ while (rootCause != null && rootCause.getCause() != null) {
+ rootCause = rootCause.getCause(); // find the root cause
+ }
String jobId = fileIdVO.getJobId();
String batchId = fileIdVO.getBatchId();
String resultId = fileIdVO.getResultId();
@@ -117,14 +155,17 @@ public class BulkResultIterator implements Iterator<JsonElement> {
for (int j = 0; j < lineCount; j++) {
lastSkippedLine = csvReader.nextRecord(); // skip these records
}
- if ((lastSkippedLine == null && preLoadedLine != null) || (lastSkippedLine != null && !lastSkippedLine.equals(preLoadedLine))) {
+ if ((lastSkippedLine == null && preLoadedLine != null) || (lastSkippedLine != null && !lastSkippedLine.equals(
+ preLoadedLine))) {
// check if last skipped line is same as the line before error
- throw new RuntimeException("Failed to verify last skipped line - retrying for =>", exceptionRetryFor);
+ String msg = rootCause == null ? "null" : rootCause.getMessage();
+ throw new OpenAndSeekException("Failed to verify last skipped line - root cause [" + msg + "]", rootCause);
}
return csvReader;
- } catch (Exception e) { // failed to open reader and skip lineCount lines
- exceptionRetryFor = exceptionRetryFor != null? exceptionRetryFor : e;
- throw new RuntimeException("Failed to [" + e.getMessage() + "] - retrying for => " , exceptionRetryFor);
+
+ } catch (Exception currentException) { // failed to open reader and skip lineCount lines // ssl failures go here
+ Throwable cause = rootCause == null ? currentException : rootCause;
+ throw new OpenAndSeekException("Failed to [" + cause.getMessage() + "]" , cause, currentException);
}
}
@@ -144,3 +185,25 @@ public class BulkResultIterator implements Iterator<JsonElement> {
}
}
}
+
+
+class OpenAndSeekException extends Exception {
+ private boolean _isCurrentExceptionExceedQuota;
+ public OpenAndSeekException(String msg, Throwable rootCause) {
+ super(msg, rootCause);
+ if (rootCause instanceof AsyncApiException &&
+ ((AsyncApiException) rootCause).getExceptionCode() == AsyncExceptionCode.ExceededQuota) {
+ _isCurrentExceptionExceedQuota = true;
+ }
+ }
+ public OpenAndSeekException(String msg, Throwable rootCause, Exception currentException) {
+ super(msg, rootCause);
+ if (currentException instanceof AsyncApiException &&
+ ((AsyncApiException) currentException).getExceptionCode() == AsyncExceptionCode.ExceededQuota) {
+ _isCurrentExceptionExceedQuota = true;
+ }
+ }
+ public boolean isCurrentExceptionExceedQuota() {
+ return _isCurrentExceptionExceedQuota;
+ }
+}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java
index fa17344..bab77e5 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java
@@ -35,8 +35,10 @@ public class ResultChainingIterator implements Iterator<JsonElement> {
private Iterator<JsonElement> iter;
private int recordCount = 0;
- public ResultChainingIterator(BulkConnection conn, List<FileIdVO> fileIdList, int retryLimit) {
- Iterator<BulkResultIterator> iterOfFiles = fileIdList.stream().map(x -> new BulkResultIterator(conn, x, retryLimit)).iterator();
+ public ResultChainingIterator(BulkConnection conn, List<FileIdVO> fileIdList, int retryLimit,
+ long retryInterval, long retryExceedQuotaInterval) {
+ Iterator<BulkResultIterator> iterOfFiles = fileIdList.stream().map(x ->
+ new BulkResultIterator(conn, x, retryLimit, retryInterval, retryExceedQuotaInterval)).iterator();
iter = Iterators.<JsonElement>concat(iterOfFiles);
}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
index 6d40421..b40e0e8 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
@@ -28,6 +28,13 @@ public final class SalesforceConfigurationKeys {
public static final int DEFAULT_FETCH_RETRY_LIMIT = 5;
public static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll";
+ // bulk api retry sleep duration for avoid resource consuming peak.
+ public static final String RETRY_EXCEED_QUOTA_INTERVAL = "salesforce.retry.exceedQuotaInterval";
+ public static final long RETRY_EXCEED_QUOTA_INTERVAL_DEFAULT = 5 * 60 * 1000;
+
+ public static final String RETRY_INTERVAL = "salesforce.retry.interval";
+ public static final long RETRY_INTERVAL_DEFAULT = 1 * 60 * 1000;
+
// pk-chunking
public static final String BULK_TEST_JOB_ID = "salesforce.bulk.testJobId";
public static final String BULK_TEST_BATCH_ID_LIST = "salesforce.bulk.testBatchIds";
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 4582b0b..2a5d8bc 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -114,10 +114,14 @@ public class SalesforceExtractor extends RestApiExtractor {
private final int pkChunkingSize;
private final SalesforceConnector sfConnector;
+
private final int retryLimit;
+ private final long retryInterval;
+ private final long retryExceedQuotaInterval;
private final boolean bulkApiUseQueryAll;
+
public SalesforceExtractor(WorkUnitState state) {
super(state);
@@ -128,6 +132,8 @@ public class SalesforceExtractor extends RestApiExtractor {
this.bulkApiUseQueryAll = workUnitState.getPropAsBoolean(BULK_API_USE_QUERY_ALL, DEFAULT_BULK_API_USE_QUERY_ALL);
this.retryLimit = workUnitState.getPropAsInt(FETCH_RETRY_LIMIT_KEY, DEFAULT_FETCH_RETRY_LIMIT);
+ this.retryInterval = workUnitState.getPropAsLong(RETRY_INTERVAL, RETRY_INTERVAL_DEFAULT);
+ this.retryExceedQuotaInterval = workUnitState.getPropAsLong(RETRY_EXCEED_QUOTA_INTERVAL, RETRY_EXCEED_QUOTA_INTERVAL_DEFAULT);
}
@Override
@@ -554,7 +560,7 @@ public class SalesforceExtractor extends RestApiExtractor {
String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
String batchIdResultIdPairString = workUnit.getProp(PK_CHUNKING_BATCH_RESULT_ID_PAIRS);
List<FileIdVO> fileIdList = this.parseBatchIdResultIdString(jobId, batchIdResultIdPairString);
- return new ResultChainingIterator(bulkConnection, fileIdList, retryLimit);
+ return new ResultChainingIterator(bulkConnection, fileIdList, retryLimit, retryInterval, retryExceedQuotaInterval);
}
private List<FileIdVO> parseBatchIdResultIdString(String jobId, String batchIdResultIdString) {
@@ -584,7 +590,8 @@ public class SalesforceExtractor extends RestApiExtractor {
List<FileIdVO> fileIdVoList = this.bulkResultIdList.stream()
.map(x -> new FileIdVO(this.bulkJob.getId(), x.batchId, x.resultId))
.collect(Collectors.toList());
- ResultChainingIterator chainingIter = new ResultChainingIterator(this.bulkConnection, fileIdVoList, this.retryLimit);
+ ResultChainingIterator chainingIter = new ResultChainingIterator(
+ bulkConnection, fileIdVoList, retryLimit, retryInterval, retryExceedQuotaInterval);
chainingIter.add(getSoftDeletedRecords(schema, entity, workUnit, predicateList));
return chainingIter;
} catch (Exception e) {