You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/03/30 22:29:45 UTC

[GitHub] [incubator-gobblin] arekusuri opened a new pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

arekusuri opened a new pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942
 
 
   ### JIRA
   https://issues.apache.org/jira/browse/GOBBLIN-1101
   
   ### Description
   1. ExceedQuota exception
   If the ExceedQuota exception happens, we should let the thread sleep 5 minutes and try again. There should not be a retryLimit for this exception.
   2. Except stack in log file
   For example we set up retryLimit to 10, we retried 10 times,  and failed; we need to print out exception stack in log file, there are 10 of them in the exception stack. We'd better skip all the retry exception, only keep the root cause exception.
   
   ### Tests
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400587342
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +65,51 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuta()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(5 * 60 * 1000); // 5 minutes
+          executeCount --; // if the current exception is Quota Exceeded, keep trying forever
+        }
+        log.info("***Retrying***1: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = null; // in next loop, call openAndSeekCsvReader
+      } catch (Exception e) {
+        // Retry may resolve other exceptions.
+        rootCause = e;
+        threadSleep(1 * 60 * 1000); // 1 minute
+        log.info("***Retrying***2: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = null; // in next loop, call openAndSeekCsvReader
       }
     }
-    throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", exception);
+    if (executeCount == 1) {
+      throw new RuntimeException("***Fetch***: Failed", rootCause);
+    } else {
+      throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", rootCause);
+    }
+  }
+
+  private void threadSleep(long millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (Exception ee) {
+      log.warn("--sleep exception--");
 
 Review comment:
   changed to 
   ```   
    try {
         Thread.sleep(millis);
       } catch (Exception e) {
         log.error("--Failed to sleep--", e);
         throw new RuntimeException(e);
       }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400572798
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
 ##########
 @@ -25,7 +25,7 @@ private SalesforceConfigurationKeys() {
   }
   public static final String SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED =
       "source.querybased.salesforce.is.soft.deletes.pull.disabled";
-  public static final int DEFAULT_FETCH_RETRY_LIMIT = 5;
+  public static final int DEFAULT_FETCH_RETRY_LIMIT = 1;
 
 Review comment:
   Why change the default here? It may affect existing users.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400571542
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +65,51 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuta()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(5 * 60 * 1000); // 5 minutes
+          executeCount --; // if the current exception is Quota Exceeded, keep trying forever
 
 Review comment:
   Whether to retry forever should be configurable, with the default as false to match the existing behavior.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r402685330
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -117,14 +155,17 @@ private InputStreamCSVReader openAndSeekCsvReader(Exception exceptionRetryFor) {
       for (int j = 0; j < lineCount; j++) {
         lastSkippedLine = csvReader.nextRecord(); // skip these records
       }
-      if ((lastSkippedLine == null && preLoadedLine != null) || (lastSkippedLine != null && !lastSkippedLine.equals(preLoadedLine))) {
+      if ((lastSkippedLine == null && preLoadedLine != null) || (lastSkippedLine != null && !lastSkippedLine.equals(
+          preLoadedLine))) {
         // check if last skipped line is same as the line before error
-        throw new RuntimeException("Failed to verify last skipped line - retrying for =>", exceptionRetryFor);
+        String msg = rootCause == null? "null" : rootCause.getMessage();
 
 Review comment:
   fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400580149
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +65,51 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuta()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(5 * 60 * 1000); // 5 minutes
+          executeCount --; // if the current exception is Quota Exceeded, keep trying forever
 
 Review comment:
   It is just for avoid request peak. In practice, it will never be forever.  We'd better not let user learn this concept.
   It is a resource management strategy.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400572661
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -144,3 +178,24 @@ private void closeCsvReader() {
     }
   }
 }
+
+class OpenAndSeekException extends Exception {
+  private boolean _isCurrentExceptionExceedQuota;
+  public OpenAndSeekException(String msg, Throwable rootCause) {
+    super(msg, rootCause);
+    if (rootCause instanceof AsyncApiException &&
+        ((AsyncApiException) rootCause).getExceptionCode() == AsyncExceptionCode.ExceededQuota) {
+      _isCurrentExceptionExceedQuota = true;
+    }
+  }
+  public OpenAndSeekException(String msg, Throwable rootCause, Exception currentException) {
+    super(msg, rootCause);
+    if (currentException instanceof AsyncApiException &&
+        ((AsyncApiException) currentException).getExceptionCode() == AsyncExceptionCode.ExceededQuota) {
+      _isCurrentExceptionExceedQuota = true;
+    }
+  }
+  public boolean isCurrentExceptionExceedQuta() {
 
 Review comment:
   Quta -> Quota

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r402685467
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +69,52 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuota()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(retryExceedQuotaInterval);
+          executeCount --; // if the current exception is Quota Exceeded, keep trying forever
 
 Review comment:
   Fixed. They are all good catch! thanks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r401268671
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +65,51 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuta()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(5 * 60 * 1000); // 5 minutes
+          executeCount --; // if the current exception is Quota Exceeded, keep trying forever
 
 Review comment:
   This affects resource utilization and latency, so I think users may want to control that based on their workload and priorities. For example, they may want the ETL job to fail immediately and not retry or to have a longer polling interval to reduce contention with higher priority jobs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400580522
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +65,51 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuta()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(5 * 60 * 1000); // 5 minutes
+          executeCount --; // if the current exception is Quota Exceeded, keep trying forever
+        }
+        log.info("***Retrying***1: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = null; // in next loop, call openAndSeekCsvReader
+      } catch (Exception e) {
+        // Retry may resolve other exceptions.
+        rootCause = e;
+        threadSleep(1 * 60 * 1000); // 1 minute
 
 Review comment:
   This is meant to be random duration as well. 
   This kind of failure will be retried only limited times. 
   User wouldn't know how long to set up. 
   Only way to find a better duration is test and get an optimal value. However this is not a critical value. we don't have to really find out the most optimal value.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400580522
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +65,51 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuta()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(5 * 60 * 1000); // 5 minutes
+          executeCount --; // if the current exception is Quota Exceeded, keep trying forever
+        }
+        log.info("***Retrying***1: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = null; // in next loop, call openAndSeekCsvReader
+      } catch (Exception e) {
+        // Retry may resolve other exceptions.
+        rootCause = e;
+        threadSleep(1 * 60 * 1000); // 1 minute
 
 Review comment:
   This is meant to be random duration as well. 
   This kind of failure will be retried only limited times.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400571618
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +65,51 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuta()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(5 * 60 * 1000); // 5 minutes
+          executeCount --; // if the current exception is Quota Exceeded, keep trying forever
+        }
+        log.info("***Retrying***1: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = null; // in next loop, call openAndSeekCsvReader
+      } catch (Exception e) {
+        // Retry may resolve other exceptions.
+        rootCause = e;
+        threadSleep(1 * 60 * 1000); // 1 minute
 
 Review comment:
   This timeout should also be configurable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400572327
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +65,51 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuta()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(5 * 60 * 1000); // 5 minutes
+          executeCount --; // if the current exception is Quota Exceeded, keep trying forever
+        }
+        log.info("***Retrying***1: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = null; // in next loop, call openAndSeekCsvReader
+      } catch (Exception e) {
+        // Retry may resolve other exceptions.
+        rootCause = e;
+        threadSleep(1 * 60 * 1000); // 1 minute
+        log.info("***Retrying***2: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = null; // in next loop, call openAndSeekCsvReader
       }
     }
-    throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", exception);
+    if (executeCount == 1) {
+      throw new RuntimeException("***Fetch***: Failed", rootCause);
+    } else {
+      throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", rootCause);
+    }
+  }
+
+  private void threadSleep(long millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (Exception ee) {
+      log.warn("--sleep exception--");
 
 Review comment:
   Add the exception to the log message. You should reraise InterruptedException or shutdown of the process may be blocked.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on issue #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on issue #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#issuecomment-607071075
 
 
   > This affects resource utilization and latency, so I think users may want to control that based on their workload and priorities. For example, they may want the ETL job to fail immediately and not retry or to have a longer polling interval to reduce contention with higher priority jobs.
   @htran1 thanks for your above comment.
   Add sleep in the retry logic is to avoid resource consuming peak. The sleep duration should be a random number. Nobody knows what is the right number to set up. (to do a lot of experiments may help find out a optimal number) 
   It is not depending anything like table size.
   Flow job developer would not be able to figure out what number to set up. Leave users the key can only confuse them more.
   > they may want the ETL job to fail immediately
   Very good question. We have key "salesforce.fetchRetryLimit", if set to 0, there won't be retry. Default retry limit is 5.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r402679066
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -117,14 +155,17 @@ private InputStreamCSVReader openAndSeekCsvReader(Exception exceptionRetryFor) {
       for (int j = 0; j < lineCount; j++) {
         lastSkippedLine = csvReader.nextRecord(); // skip these records
       }
-      if ((lastSkippedLine == null && preLoadedLine != null) || (lastSkippedLine != null && !lastSkippedLine.equals(preLoadedLine))) {
+      if ((lastSkippedLine == null && preLoadedLine != null) || (lastSkippedLine != null && !lastSkippedLine.equals(
+          preLoadedLine))) {
         // check if last skipped line is same as the line before error
-        throw new RuntimeException("Failed to verify last skipped line - retrying for =>", exceptionRetryFor);
+        String msg = rootCause == null? "null" : rootCause.getMessage();
 
 Review comment:
   Add space between null and ?.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400587927
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
 ##########
 @@ -25,7 +25,7 @@ private SalesforceConfigurationKeys() {
   }
   public static final String SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED =
       "source.querybased.salesforce.is.soft.deletes.pull.disabled";
-  public static final int DEFAULT_FETCH_RETRY_LIMIT = 5;
+  public static final int DEFAULT_FETCH_RETRY_LIMIT = 1;
 
 Review comment:
   OK, let's restore it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r402678830
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +69,52 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
 
 Review comment:
   Please remove the space before ++.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] codecov-io commented on issue #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#issuecomment-608165951
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2942?src=pr&el=h1) Report
   > Merging [#2942](https://codecov.io/gh/apache/incubator-gobblin/pull/2942?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/d54e66b857a34e871fd7c9ab28e37c9c9f061432&el=desc) will **decrease** coverage by `0.96%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2942/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/2942?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2942      +/-   ##
   ============================================
   - Coverage     45.58%   44.61%   -0.97%     
   + Complexity     9157     8989     -168     
   ============================================
     Files          1936     1936              
     Lines         73286    73325      +39     
     Branches       8088     8095       +7     
   ============================================
   - Hits          33409    32716     -693     
   - Misses        36782    37556     +774     
   + Partials       3095     3053      -42     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2942?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../apache/gobblin/salesforce/BulkResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2942/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvQnVsa1Jlc3VsdEl0ZXJhdG9yLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...che/gobblin/salesforce/ResultChainingIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2942/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUmVzdWx0Q2hhaW5pbmdJdGVyYXRvci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2942/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...gobblin/runtime/mapreduce/GobblinOutputFormat.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2942/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbWFwcmVkdWNlL0dvYmJsaW5PdXRwdXRGb3JtYXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...askStateCollectorServiceHiveRegHandlerFactory.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2942/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvVGFza1N0YXRlQ29sbGVjdG9yU2VydmljZUhpdmVSZWdIYW5kbGVyRmFjdG9yeS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...re/filesystem/FsDatasetStateStoreEntryManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2942/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbWV0YXN0b3JlL2ZpbGVzeXN0ZW0vRnNEYXRhc2V0U3RhdGVTdG9yZUVudHJ5TWFuYWdlci5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...in/runtime/mapreduce/CustomizedProgresserBase.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2942/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbWFwcmVkdWNlL0N1c3RvbWl6ZWRQcm9ncmVzc2VyQmFzZS5qYXZh) | `0.00% <0.00%> (-83.34%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/gobblin/runtime/ZkDatasetStateStore.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2942/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4taGVsaXgvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9aa0RhdGFzZXRTdGF0ZVN0b3JlLmphdmE=) | `0.00% <0.00%> (-80.77%)` | `0.00% <0.00%> (-7.00%)` | |
   | [...lin/runtime/locks/LegacyJobLockFactoryManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2942/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9ja3MvTGVnYWN5Sm9iTG9ja0ZhY3RvcnlNYW5hZ2VyLmphdmE=) | `0.00% <0.00%> (-78.58%)` | `0.00% <0.00%> (-2.00%)` | |
   | [.../apache/gobblin/metastore/ZkStateStoreFactory.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2942/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4taGVsaXgvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vbWV0YXN0b3JlL1prU3RhdGVTdG9yZUZhY3RvcnkuamF2YQ==) | `0.00% <0.00%> (-71.43%)` | `0.00% <0.00%> (-2.00%)` | |
   | ... and [42 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2942/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2942?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2942?src=pr&el=footer). Last update [d54e66b...bdc36d9](https://codecov.io/gh/apache/incubator-gobblin/pull/2942?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400579427
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +65,51 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuta()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(5 * 60 * 1000); // 5 minutes
 
 Review comment:
   This argument is not critical. Because of resource is not sufficient, we need to wait a random duration. 1 minute is OK, 5 minutes is OK too, for it fails, continue to wait until succeeds. 
   User would not need to know a new concept about this. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri edited a comment on issue #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri edited a comment on issue #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#issuecomment-607071075
 
 
   > This affects resource utilization and latency, so I think users may want to control that based on their workload and priorities. For example, they may want the ETL job to fail immediately and not retry or to have a longer polling interval to reduce contention with higher priority jobs.
   
   @htran1 thanks for your above comment.
   Add sleep in the retry logic is to avoid resource consuming peak. The sleep duration should be a random number. Nobody knows what is the right number to set up. (to do a lot of experiments may help find out a optimal number) 
   It is not depending anything like table size.
   Flow job developer would not be able to figure out what number to set up. Leave users the key can only confuse them more.
   
   > they may want the ETL job to fail immediately
   
   Very good question. We have key `salesforce.fetchRetryLimit`. if set to 0, there won't be retry. Default retry limit is 5.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400587342
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +65,51 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuta()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(5 * 60 * 1000); // 5 minutes
+          executeCount --; // if the current exception is Quota Exceeded, keep trying forever
+        }
+        log.info("***Retrying***1: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = null; // in next loop, call openAndSeekCsvReader
+      } catch (Exception e) {
+        // Retry may resolve other exceptions.
+        rootCause = e;
+        threadSleep(1 * 60 * 1000); // 1 minute
+        log.info("***Retrying***2: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = null; // in next loop, call openAndSeekCsvReader
       }
     }
-    throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", exception);
+    if (executeCount == 1) {
+      throw new RuntimeException("***Fetch***: Failed", rootCause);
+    } else {
+      throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", rootCause);
+    }
+  }
+
+  private void threadSleep(long millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (Exception ee) {
+      log.warn("--sleep exception--");
 
 Review comment:
   changed to 
   `    try {
         Thread.sleep(millis);
       } catch (Exception e) {
         log.error("--Failed to sleep--", e);
         throw new RuntimeException(e);
       }`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r402679489
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +69,52 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuota()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(retryExceedQuotaInterval);
+          executeCount --; // if the current exception is Quota Exceeded, keep trying forever
 
 Review comment:
   Remove space before --.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r402685295
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +69,52 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
 
 Review comment:
   fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on issue #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on issue #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#issuecomment-607488205
 
 
   @htran1 OK, I will add a key for the duration.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400587342
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +65,51 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuta()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(5 * 60 * 1000); // 5 minutes
+          executeCount --; // if the current exception is Quota Exceeded, keep trying forever
+        }
+        log.info("***Retrying***1: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = null; // in next loop, call openAndSeekCsvReader
+      } catch (Exception e) {
+        // Retry may resolve other exceptions.
+        rootCause = e;
+        threadSleep(1 * 60 * 1000); // 1 minute
+        log.info("***Retrying***2: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = null; // in next loop, call openAndSeekCsvReader
       }
     }
-    throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", exception);
+    if (executeCount == 1) {
+      throw new RuntimeException("***Fetch***: Failed", rootCause);
+    } else {
+      throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", rootCause);
+    }
+  }
+
+  private void threadSleep(long millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (Exception ee) {
+      log.warn("--sleep exception--");
 
 Review comment:
   changed to 
   ```    try {
         Thread.sleep(millis);
       } catch (Exception e) {
         log.error("--Failed to sleep--", e);
         throw new RuntimeException(e);
       }```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400587667
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -144,3 +178,24 @@ private void closeCsvReader() {
     }
   }
 }
+
+class OpenAndSeekException extends Exception {
+  private boolean _isCurrentExceptionExceedQuota;
+  public OpenAndSeekException(String msg, Throwable rootCause) {
+    super(msg, rootCause);
+    if (rootCause instanceof AsyncApiException &&
+        ((AsyncApiException) rootCause).getExceptionCode() == AsyncExceptionCode.ExceededQuota) {
+      _isCurrentExceptionExceedQuota = true;
+    }
+  }
+  public OpenAndSeekException(String msg, Throwable rootCause, Exception currentException) {
+    super(msg, rootCause);
+    if (currentException instanceof AsyncApiException &&
+        ((AsyncApiException) currentException).getExceptionCode() == AsyncExceptionCode.ExceededQuota) {
+      _isCurrentExceptionExceedQuota = true;
+    }
+  }
+  public boolean isCurrentExceptionExceedQuta() {
 
 Review comment:
   good catch! thanks! fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r402679099
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -117,14 +155,17 @@ private InputStreamCSVReader openAndSeekCsvReader(Exception exceptionRetryFor) {
       for (int j = 0; j < lineCount; j++) {
         lastSkippedLine = csvReader.nextRecord(); // skip these records
       }
-      if ((lastSkippedLine == null && preLoadedLine != null) || (lastSkippedLine != null && !lastSkippedLine.equals(preLoadedLine))) {
+      if ((lastSkippedLine == null && preLoadedLine != null) || (lastSkippedLine != null && !lastSkippedLine.equals(
+          preLoadedLine))) {
         // check if last skipped line is same as the line before error
-        throw new RuntimeException("Failed to verify last skipped line - retrying for =>", exceptionRetryFor);
+        String msg = rootCause == null? "null" : rootCause.getMessage();
+        throw new OpenAndSeekException("Failed to verify last skipped line - root cause [" + msg + "]", rootCause);
       }
       return csvReader;
-    } catch (Exception e) { // failed to open reader and skip lineCount lines
-      exceptionRetryFor = exceptionRetryFor != null? exceptionRetryFor : e;
-      throw new RuntimeException("Failed to [" + e.getMessage() + "] - retrying for => " , exceptionRetryFor);
+
+    } catch (Exception currentException) { // failed to open reader and skip lineCount lines // ssl failures go here
+      Throwable cause = rootCause == null? currentException : rootCause;
 
 Review comment:
   Add space between null and ?.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri edited a comment on issue #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri edited a comment on issue #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#issuecomment-607071075
 
 
   > This affects resource utilization and latency, so I think users may want to control that based on their workload and priorities. For example, they may want the ETL job to fail immediately and not retry or to have a longer polling interval to reduce contention with higher priority jobs.
   
   @htran1 thanks for your above comment.
   Add sleep in the retry logic is to avoid resource consuming peak. The sleep duration should be a random number. Nobody knows what is the right number to set up. (to do a lot of experiments may help find out a optimal number) 
   It is not depending anything like table size.
   Flow job developer would not be able to figure out what number to set up. Leave users the key can only confuse them more.
   
   > they may want the ETL job to fail immediately
   
   Very good question. We have key "salesforce.fetchRetryLimit", if set to 0, there won't be retry. Default retry limit is 5.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r402685355
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -117,14 +155,17 @@ private InputStreamCSVReader openAndSeekCsvReader(Exception exceptionRetryFor) {
       for (int j = 0; j < lineCount; j++) {
         lastSkippedLine = csvReader.nextRecord(); // skip these records
       }
-      if ((lastSkippedLine == null && preLoadedLine != null) || (lastSkippedLine != null && !lastSkippedLine.equals(preLoadedLine))) {
+      if ((lastSkippedLine == null && preLoadedLine != null) || (lastSkippedLine != null && !lastSkippedLine.equals(
+          preLoadedLine))) {
         // check if last skipped line is same as the line before error
-        throw new RuntimeException("Failed to verify last skipped line - retrying for =>", exceptionRetryFor);
+        String msg = rootCause == null? "null" : rootCause.getMessage();
+        throw new OpenAndSeekException("Failed to verify last skipped line - root cause [" + msg + "]", rootCause);
       }
       return csvReader;
-    } catch (Exception e) { // failed to open reader and skip lineCount lines
-      exceptionRetryFor = exceptionRetryFor != null? exceptionRetryFor : e;
-      throw new RuntimeException("Failed to [" + e.getMessage() + "] - retrying for => " , exceptionRetryFor);
+
+    } catch (Exception currentException) { // failed to open reader and skip lineCount lines // ssl failures go here
+      Throwable cause = rootCause == null? currentException : rootCause;
 
 Review comment:
   fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] asfgit closed pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r400571363
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +65,51 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuta()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(5 * 60 * 1000); // 5 minutes
 
 Review comment:
   Please make the timeout configurable. Default can be set to 5 minutes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2942: GOBBLIN-1101(DSS-25241): Enhance bulk api retry for ExceedQuota
URL: https://github.com/apache/incubator-gobblin/pull/2942#discussion_r401901802
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -62,24 +65,51 @@ private void initHeader() {
   }
 
   private List<String> nextLineWithRetry() {
-    Exception exception = null;
-    for (int i = 0; i < retryLimit; i++) {
+    Throwable rootCause = null;
+    int executeCount = 0;
+    while (executeCount < retryLimit + 1) {
+      executeCount ++;
       try {
         if (this.csvReader == null) {
-          this.csvReader = openAndSeekCsvReader(null);
+          this.csvReader = openAndSeekCsvReader(rootCause);
         }
         List<String> line = this.csvReader.nextRecord();
         this.lineCount++;
         return line;
       } catch (InputStreamCSVReader.CSVParseException e) {
         throw new RuntimeException(e); // don't retry if it is parse error
-      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
-        exception = e;
-        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
-        this.csvReader = openAndSeekCsvReader(e);
+      } catch (OpenAndSeekException e) {
+        rootCause = e.getCause();
+        // Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
+        // any new synchronous Apex request results in a runtime exception.
+        if (e.isCurrentExceptionExceedQuta()) {
+          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          threadSleep(5 * 60 * 1000); // 5 minutes
 
 Review comment:
   It is not only about Gobblin ETL jobs. There can be other consumers of the resource that is throttled. Having a configuration key with default value does not make the user experience any worse and allows tuning when required.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services