You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/10/11 22:47:18 UTC

incubator-gobblin git commit: [GOBBLIN-284] Add retry in SalesforceExtractor to handle transient ne…

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master becb2b786 -> c5e83a331


[GOBBLIN-284] Add retry in SalesforceExtractor to handle transient ne…

Closes #2137 from htran1/salesforce_fetch_fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c5e83a33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c5e83a33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c5e83a33

Branch: refs/heads/master
Commit: c5e83a3317e6d76c2399dd8d64d876a4247e25b6
Parents: becb2b7
Author: Hung Tran <hu...@linkedin.com>
Authored: Wed Oct 11 15:47:09 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Oct 11 15:47:09 2017 -0700

----------------------------------------------------------------------
 .../gobblin/salesforce/SalesforceExtractor.java | 183 ++++++++++++++-----
 1 file changed, 140 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c5e83a33/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 38b4d1b..0c16051 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -108,6 +108,8 @@ public class SalesforceExtractor extends RestApiExtractor {
   private static final int MAX_RETRY_INTERVAL_SECS = 600;
   // avoid using too many bulk API calls by only allowing PK chunking only if max partitions is configured <= this
   private static final int PK_CHUNKING_MAX_PARTITIONS_LIMIT = 3;
+  private static final String FETCH_RETRY_LIMIT_KEY = "salesforce.fetchRetryLimit";
+  private static final int DEFAULT_FETCH_RETRY_LIMIT = 5;
 
   private boolean pullStatus = true;
   private String nextUrl;
@@ -124,10 +126,13 @@ public class SalesforceExtractor extends RestApiExtractor {
   private boolean newBulkResultSet = true;
   private int bulkRecordCount = 0;
   private int prevBulkRecordCount = 0;
+  private List<String> csvRecord;
 
   private final boolean pkChunking;
   private final int pkChunkingSize;
   private final SalesforceConnector sfConnector;
+  private final int fetchRetryLimit;
+  private final int batchSize;
 
   public SalesforceExtractor(WorkUnitState state) {
     super(state);
@@ -149,6 +154,13 @@ public class SalesforceExtractor extends RestApiExtractor {
     this.pkChunkingSize =
         Math.max(MIN_PK_CHUNKING_SIZE,
             Math.min(MAX_PK_CHUNKING_SIZE, state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE)));
+
+    // Get batch size from .pull file
+    int tmpBatchSize = state.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE,
+        ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE);
+
+    this.batchSize = tmpBatchSize == 0 ? ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE : tmpBatchSize;
+    this.fetchRetryLimit = state.getPropAsInt(FETCH_RETRY_LIMIT_KEY, DEFAULT_FETCH_RETRY_LIMIT);
   }
 
   @Override
@@ -581,7 +593,8 @@ public class SalesforceExtractor extends RestApiExtractor {
 
       // Get data from input stream
       // If bulk load is not finished, get data from the stream
-      if (!this.isBulkJobFinished()) {
+      // Skip empty result sets since they will cause the extractor to terminate early
+      while (!this.isBulkJobFinished() && (rs == null || rs.isEmpty())) {
         rs = getBulkData();
       }
 
@@ -775,6 +788,125 @@ public class SalesforceExtractor extends RestApiExtractor {
   }
 
   /**
+   * Get a buffered reader wrapping the query result stream for the result with the specified index
+   * @param index index the {@link #bulkResultIdList}
+   * @return a {@link BufferedReader}
+   * @throws AsyncApiException
+   */
+  private BufferedReader getBulkBufferedReader(int index) throws AsyncApiException {
+    return new BufferedReader(new InputStreamReader(
+        this.bulkConnection.getQueryResultStream(this.bulkJob.getId(), this.bulkResultIdList.get(index).getBatchId(),
+            this.bulkResultIdList.get(index).getResultId()), ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+  }
+
+  /**
+   * Fetch records into a {@link RecordSetList} up to the configured batch size {@link #batchSize}. This batch is not
+   * the entire Salesforce result batch. It is an internal batch in the extractor for buffering a subset of the result
+   * stream that comes from a Salesforce batch for more efficient processing.
+   * @param rs the record set to fetch into
+   * @param initialRecordCount Initial record count to use. This should correspond to the number of records already in rs.
+   *                           This is used to limit the number of records returned in rs to {@link #batchSize}.
+   * @throws DataRecordException
+   * @throws IOException
+   */
+  private void fetchResultBatch(RecordSetList<JsonElement> rs, int initialRecordCount)
+      throws DataRecordException, IOException {
+    int recordCount = initialRecordCount;
+
+    // Stream the resultset through CSV reader to identify columns in each record
+    InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);
+
+    // Get header if it is first run of a new resultset
+    if (this.isNewBulkResultSet()) {
+      this.bulkRecordHeader = reader.nextRecord();
+      this.bulkResultColumCount = this.bulkRecordHeader.size();
+      this.setNewBulkResultSet(false);
+    }
+
+    // Get record from CSV reader stream
+    while ((this.csvRecord = reader.nextRecord()) != null) {
+      // Convert CSV record to JsonObject
+      JsonObject jsonObject = Utils.csvToJsonObject(this.bulkRecordHeader, this.csvRecord, this.bulkResultColumCount);
+      rs.add(jsonObject);
+      recordCount++;
+      this.bulkRecordCount++;
+
+      // Insert records in record set until it reaches the batch size
+      if (recordCount >= batchSize) {
+        log.info("Total number of records processed so far: " + this.bulkRecordCount);
+        break;
+      }
+    }
+  }
+
+  /**
+   * Reinitialize the state of {@link #bulkBufferedReader} to handle network disconnects
+   * @throws IOException
+   * @throws AsyncApiException
+   */
+  private void reinitializeBufferedReader() throws IOException, AsyncApiException {
+    // close reader and get a new input stream to reconnect to resolve intermittent network errors
+    this.bulkBufferedReader.close();
+    this.bulkBufferedReader = getBulkBufferedReader(this.bulkResultIdCount - 1);
+
+    // if the result set is partially processed then we need to skip over processed records
+    if (!isNewBulkResultSet()) {
+      List<String> lastCsvRecord = null;
+      InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);
+
+      // skip header
+      reader.nextRecord();
+
+      int recordsToSkip = this.bulkRecordCount - this.prevBulkRecordCount;
+      log.info("Skipping {} records on retry: ", recordsToSkip);
+
+      for (int i = 0; i < recordsToSkip; i++) {
+        lastCsvRecord = reader.nextRecord();
+      }
+
+      // make sure the last record processed before the error was the last record skipped so that the next
+      // unprocessed record is processed in the next call to fetchResultBatch()
+      if (recordsToSkip > 0) {
+        if (!this.csvRecord.equals(lastCsvRecord)) {
+          throw new RuntimeException("Repositioning after reconnecting did not point to the expected record");
+        }
+      }
+    }
+  }
+
+  /**
+   * Fetch a result batch with retry for network errors
+   * @param rs the {@link RecordSetList} to fetch into
+   */
+  private void fetchResultBatchWithRetry(RecordSetList<JsonElement> rs)
+      throws AsyncApiException, DataRecordException, IOException {
+    boolean success = false;
+    int retryCount = 0;
+    int recordCountBeforeFetch = this.bulkRecordCount;
+
+    do {
+      try {
+        // reinitialize the reader to establish a new connection to handle transient network errors
+        if (retryCount > 0) {
+          reinitializeBufferedReader();
+        }
+
+        // on retries there may already be records in rs, so pass the number of records as the initial count
+        fetchResultBatch(rs, this.bulkRecordCount - recordCountBeforeFetch);
+        success = true;
+      } catch (IOException e) {
+        if (retryCount < this.fetchRetryLimit) {
+          log.info("Exception while fetching data, retrying: " + e.getMessage(), e);
+          retryCount++;
+        } else {
+          log.error("Exception while fetching data: " + e.getMessage(), e);
+          throw e;
+        }
+      }
+    } while (!success);
+  }
+
+  /**
    * Get data from the bulk api input stream
      * @return record set with each record as a JsonObject
    */
@@ -796,14 +928,12 @@ public class SalesforceExtractor extends RestApiExtractor {
         if (this.bulkResultIdCount < this.bulkResultIdList.size()) {
           log.info("Stream resultset for resultId:" + this.bulkResultIdList.get(this.bulkResultIdCount));
           this.setNewBulkResultSet(true);
-          this.bulkBufferedReader =
-              new BufferedReader(
-                  new InputStreamReader(
-                      this.bulkConnection.getQueryResultStream(this.bulkJob.getId(),
-                          this.bulkResultIdList.get(this.bulkResultIdCount).getBatchId(),
-                          this.bulkResultIdList.get(this.bulkResultIdCount).getResultId()),
-                      ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
 
+          if (this.bulkBufferedReader != null) {
+            this.bulkBufferedReader.close();
+          }
+
+          this.bulkBufferedReader = getBulkBufferedReader(this.bulkResultIdCount);
           this.bulkResultIdCount++;
           this.prevBulkRecordCount = bulkRecordCount;
         } else {
@@ -814,41 +944,8 @@ public class SalesforceExtractor extends RestApiExtractor {
         }
       }
 
-      // if Buffer stream has data then process the same
-
-      // Get batch size from .pull file
-      int batchSize = Utils.getAsInt(this.workUnitState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE));
-      if (batchSize == 0) {
-        batchSize = ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE;
-      }
-
-      // Stream the resultset through CSV reader to identify columns in each record
-      InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);
-
-      // Get header if it is first run of a new resultset
-      if (this.isNewBulkResultSet()) {
-        this.bulkRecordHeader = reader.nextRecord();
-        this.bulkResultColumCount = this.bulkRecordHeader.size();
-        this.setNewBulkResultSet(false);
-      }
-
-      List<String> csvRecord;
-      int recordCount = 0;
-
-      // Get record from CSV reader stream
-      while ((csvRecord = reader.nextRecord()) != null) {
-        // Convert CSV record to JsonObject
-        JsonObject jsonObject = Utils.csvToJsonObject(this.bulkRecordHeader, csvRecord, this.bulkResultColumCount);
-        rs.add(jsonObject);
-        recordCount++;
-        this.bulkRecordCount++;
-
-        // Insert records in record set until it reaches the batch size
-        if (recordCount >= batchSize) {
-          log.info("Total number of records processed so far: " + this.bulkRecordCount);
-          break;
-        }
-      }
+      // fetch a batch of results with retry for network errors
+      fetchResultBatchWithRetry(rs);
 
     } catch (Exception e) {
       throw new DataRecordException("Failed to get records from salesforce; error - " + e.getMessage(), e);