You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/10/11 22:47:18 UTC
incubator-gobblin git commit: [GOBBLIN-284] Add retry in SalesforceExtractor to handle transient ne…
Repository: incubator-gobblin
Updated Branches:
refs/heads/master becb2b786 -> c5e83a331
[GOBBLIN-284] Add retry in SalesforceExtractor to handle transient ne…
Closes #2137 from htran1/salesforce_fetch_fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c5e83a33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c5e83a33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c5e83a33
Branch: refs/heads/master
Commit: c5e83a3317e6d76c2399dd8d64d876a4247e25b6
Parents: becb2b7
Author: Hung Tran <hu...@linkedin.com>
Authored: Wed Oct 11 15:47:09 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Oct 11 15:47:09 2017 -0700
----------------------------------------------------------------------
.../gobblin/salesforce/SalesforceExtractor.java | 183 ++++++++++++++-----
1 file changed, 140 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c5e83a33/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 38b4d1b..0c16051 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -108,6 +108,8 @@ public class SalesforceExtractor extends RestApiExtractor {
private static final int MAX_RETRY_INTERVAL_SECS = 600;
// avoid using too many bulk API calls by only allowing PK chunking only if max partitions is configured <= this
private static final int PK_CHUNKING_MAX_PARTITIONS_LIMIT = 3;
+ private static final String FETCH_RETRY_LIMIT_KEY = "salesforce.fetchRetryLimit";
+ private static final int DEFAULT_FETCH_RETRY_LIMIT = 5;
private boolean pullStatus = true;
private String nextUrl;
@@ -124,10 +126,13 @@ public class SalesforceExtractor extends RestApiExtractor {
private boolean newBulkResultSet = true;
private int bulkRecordCount = 0;
private int prevBulkRecordCount = 0;
+ private List<String> csvRecord;
private final boolean pkChunking;
private final int pkChunkingSize;
private final SalesforceConnector sfConnector;
+ private final int fetchRetryLimit;
+ private final int batchSize;
public SalesforceExtractor(WorkUnitState state) {
super(state);
@@ -149,6 +154,13 @@ public class SalesforceExtractor extends RestApiExtractor {
this.pkChunkingSize =
Math.max(MIN_PK_CHUNKING_SIZE,
Math.min(MAX_PK_CHUNKING_SIZE, state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE)));
+
+ // Get batch size from .pull file
+ int tmpBatchSize = state.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE,
+ ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE);
+
+ this.batchSize = tmpBatchSize == 0 ? ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE : tmpBatchSize;
+ this.fetchRetryLimit = state.getPropAsInt(FETCH_RETRY_LIMIT_KEY, DEFAULT_FETCH_RETRY_LIMIT);
}
@Override
@@ -581,7 +593,8 @@ public class SalesforceExtractor extends RestApiExtractor {
// Get data from input stream
// If bulk load is not finished, get data from the stream
- if (!this.isBulkJobFinished()) {
+ // Skip empty result sets since they will cause the extractor to terminate early
+ while (!this.isBulkJobFinished() && (rs == null || rs.isEmpty())) {
rs = getBulkData();
}
@@ -775,6 +788,125 @@ public class SalesforceExtractor extends RestApiExtractor {
}
/**
+ * Get a buffered reader wrapping the query result stream for the result with the specified index
+ * @param index index the {@link #bulkResultIdList}
+ * @return a {@link BufferedReader}
+ * @throws AsyncApiException
+ */
+ private BufferedReader getBulkBufferedReader(int index) throws AsyncApiException {
+ return new BufferedReader(new InputStreamReader(
+ this.bulkConnection.getQueryResultStream(this.bulkJob.getId(), this.bulkResultIdList.get(index).getBatchId(),
+ this.bulkResultIdList.get(index).getResultId()), ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+ }
+
+ /**
+ * Fetch records into a {@link RecordSetList} up to the configured batch size {@link #batchSize}. This batch is not
+ * the entire Salesforce result batch. It is an internal batch in the extractor for buffering a subset of the result
+ * stream that comes from a Salesforce batch for more efficient processing.
+ * @param rs the record set to fetch into
+ * @param initialRecordCount Initial record count to use. This should correspond to the number of records already in rs.
+ * This is used to limit the number of records returned in rs to {@link #batchSize}.
+ * @throws DataRecordException
+ * @throws IOException
+ */
+ private void fetchResultBatch(RecordSetList<JsonElement> rs, int initialRecordCount)
+ throws DataRecordException, IOException {
+ int recordCount = initialRecordCount;
+
+ // Stream the resultset through CSV reader to identify columns in each record
+ InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);
+
+ // Get header if it is first run of a new resultset
+ if (this.isNewBulkResultSet()) {
+ this.bulkRecordHeader = reader.nextRecord();
+ this.bulkResultColumCount = this.bulkRecordHeader.size();
+ this.setNewBulkResultSet(false);
+ }
+
+ // Get record from CSV reader stream
+ while ((this.csvRecord = reader.nextRecord()) != null) {
+ // Convert CSV record to JsonObject
+ JsonObject jsonObject = Utils.csvToJsonObject(this.bulkRecordHeader, this.csvRecord, this.bulkResultColumCount);
+ rs.add(jsonObject);
+ recordCount++;
+ this.bulkRecordCount++;
+
+ // Insert records in record set until it reaches the batch size
+ if (recordCount >= batchSize) {
+ log.info("Total number of records processed so far: " + this.bulkRecordCount);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Reinitialize the state of {@link #bulkBufferedReader} to handle network disconnects
+ * @throws IOException
+ * @throws AsyncApiException
+ */
+ private void reinitializeBufferedReader() throws IOException, AsyncApiException {
+ // close reader and get a new input stream to reconnect to resolve intermittent network errors
+ this.bulkBufferedReader.close();
+ this.bulkBufferedReader = getBulkBufferedReader(this.bulkResultIdCount - 1);
+
+ // if the result set is partially processed then we need to skip over processed records
+ if (!isNewBulkResultSet()) {
+ List<String> lastCsvRecord = null;
+ InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);
+
+ // skip header
+ reader.nextRecord();
+
+ int recordsToSkip = this.bulkRecordCount - this.prevBulkRecordCount;
+ log.info("Skipping {} records on retry: ", recordsToSkip);
+
+ for (int i = 0; i < recordsToSkip; i++) {
+ lastCsvRecord = reader.nextRecord();
+ }
+
+ // make sure the last record processed before the error was the last record skipped so that the next
+ // unprocessed record is processed in the next call to fetchResultBatch()
+ if (recordsToSkip > 0) {
+ if (!this.csvRecord.equals(lastCsvRecord)) {
+ throw new RuntimeException("Repositioning after reconnecting did not point to the expected record");
+ }
+ }
+ }
+ }
+
+ /**
+ * Fetch a result batch with retry for network errors
+ * @param rs the {@link RecordSetList} to fetch into
+ */
+ private void fetchResultBatchWithRetry(RecordSetList<JsonElement> rs)
+ throws AsyncApiException, DataRecordException, IOException {
+ boolean success = false;
+ int retryCount = 0;
+ int recordCountBeforeFetch = this.bulkRecordCount;
+
+ do {
+ try {
+ // reinitialize the reader to establish a new connection to handle transient network errors
+ if (retryCount > 0) {
+ reinitializeBufferedReader();
+ }
+
+ // on retries there may already be records in rs, so pass the number of records as the initial count
+ fetchResultBatch(rs, this.bulkRecordCount - recordCountBeforeFetch);
+ success = true;
+ } catch (IOException e) {
+ if (retryCount < this.fetchRetryLimit) {
+ log.info("Exception while fetching data, retrying: " + e.getMessage(), e);
+ retryCount++;
+ } else {
+ log.error("Exception while fetching data: " + e.getMessage(), e);
+ throw e;
+ }
+ }
+ } while (!success);
+ }
+
+ /**
* Get data from the bulk api input stream
* @return record set with each record as a JsonObject
*/
@@ -796,14 +928,12 @@ public class SalesforceExtractor extends RestApiExtractor {
if (this.bulkResultIdCount < this.bulkResultIdList.size()) {
log.info("Stream resultset for resultId:" + this.bulkResultIdList.get(this.bulkResultIdCount));
this.setNewBulkResultSet(true);
- this.bulkBufferedReader =
- new BufferedReader(
- new InputStreamReader(
- this.bulkConnection.getQueryResultStream(this.bulkJob.getId(),
- this.bulkResultIdList.get(this.bulkResultIdCount).getBatchId(),
- this.bulkResultIdList.get(this.bulkResultIdCount).getResultId()),
- ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+ if (this.bulkBufferedReader != null) {
+ this.bulkBufferedReader.close();
+ }
+
+ this.bulkBufferedReader = getBulkBufferedReader(this.bulkResultIdCount);
this.bulkResultIdCount++;
this.prevBulkRecordCount = bulkRecordCount;
} else {
@@ -814,41 +944,8 @@ public class SalesforceExtractor extends RestApiExtractor {
}
}
- // if Buffer stream has data then process the same
-
- // Get batch size from .pull file
- int batchSize = Utils.getAsInt(this.workUnitState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE));
- if (batchSize == 0) {
- batchSize = ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE;
- }
-
- // Stream the resultset through CSV reader to identify columns in each record
- InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);
-
- // Get header if it is first run of a new resultset
- if (this.isNewBulkResultSet()) {
- this.bulkRecordHeader = reader.nextRecord();
- this.bulkResultColumCount = this.bulkRecordHeader.size();
- this.setNewBulkResultSet(false);
- }
-
- List<String> csvRecord;
- int recordCount = 0;
-
- // Get record from CSV reader stream
- while ((csvRecord = reader.nextRecord()) != null) {
- // Convert CSV record to JsonObject
- JsonObject jsonObject = Utils.csvToJsonObject(this.bulkRecordHeader, csvRecord, this.bulkResultColumCount);
- rs.add(jsonObject);
- recordCount++;
- this.bulkRecordCount++;
-
- // Insert records in record set until it reaches the batch size
- if (recordCount >= batchSize) {
- log.info("Total number of records processed so far: " + this.bulkRecordCount);
- break;
- }
- }
+ // fetch a batch of results with retry for network errors
+ fetchResultBatchWithRetry(rs);
} catch (Exception e) {
throw new DataRecordException("Failed to get records from salesforce; error - " + e.getMessage(), e);