You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/01/14 19:49:07 UTC

[GitHub] [incubator-gobblin] arekusuri opened a new pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

arekusuri opened a new pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   https://issues.apache.org/jira/browse/GOBBLIN-1025
   
   
   ### Description
   In SFDC connector, there is a class called `ResultIterator` (I will change the name to SalesforceRecordIterator).
   It was using by only PK-Chunking currently. It encapsulated fetching a list of result files to a record iterator.
   
   However, the csvReader.nextRecord() may throw out network IO exception. We should do retry in this case.
   
   When a result file is fetched partly and one network IO exception happens, we are in a special situation - first half of the file is already fetched to our local, but another half of the file is still on datasource. 
   We need to
   1. reopen the file stream
   2. skip all the records that we already fetched, seek the cursor to the record which we haven't fetched yet.
   
   ### Tests
   https://ltx1-holdemaz05.grid.linkedin.com:8443/executor?execid=21956300&job=salesforce_task_full&attempt=0
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372591935
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
 ##########
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
+
+  private int recordCount = 0;
+  private SalesforceExtractor extractor;
+  private String schema;
+  private String entity;
+  private WorkUnit workUnit;
+  private List<Predicate> predicateList;
+
+  private Iterator<JsonElement> queryResultIter;
+
+  public QueryResultIterator(
+      SalesforceExtractor extractor,
+      String schema,
+      String entity,
+      WorkUnit workUnit,
+      List<Predicate> predicateList
+  ) {
+    log.info("create query result iterator.");
+    this.extractor = extractor;
+    this.schema = schema;
+    this.entity = entity;
+    this.workUnit = workUnit;
+    this.predicateList = predicateList;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (queryResultIter == null) {
+      initQueryResultIter();
+    }
+    if (!queryResultIter.hasNext()) {
+      // no more data, print out total
+      log.info("Soft delete records total:{}", recordCount);
+    }
+    return queryResultIter.hasNext();
+  }
+
+  private void initQueryResultIter() {
+    try {
+      log.info("Pull soft delete records");
+      queryResultIter = extractor.getRecordSet(schema, entity, workUnit, predicateList);
+    } catch (DataRecordException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public JsonElement next() {
+    if (queryResultIter == null) {
+      initQueryResultIter();
+    }
+    recordCount ++;
+    if (!queryResultIter.hasNext()) {
+      // no more data, print out total
+      log.info("Soft delete records total:{}", recordCount);
 
 Review comment:
   fixed. thanks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372554615
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java
 ##########
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.common.collect.Iterators;
+import com.google.gson.JsonElement;
+import com.sforce.async.BulkConnection;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Result Iterator.
 
 Review comment:
   Please have a less generic comment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372595268
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java
 ##########
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.common.collect.Iterators;
+import com.google.gson.JsonElement;
+import com.sforce.async.BulkConnection;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Result Iterator.
 
 Review comment:
   fixed, thanks!
   
   >  * The Iterator to chain all result iterators together.
   >  * It is to create only one iterator for a list of result files of BulkAPI.
   >  * Same time it can also be able to add other iterator with function `add` to combine to 1 iterator

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#issuecomment-574354876
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=h1) Report
   > Merging [#2868](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/c7e2c670734ea1009dace621ecb137219476ce29?src=pr&el=desc) will **decrease** coverage by `<.01%`.
   > The diff coverage is `0%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2868      +/-   ##
   ============================================
   - Coverage     45.79%   45.78%   -0.01%     
   - Complexity     9108     9110       +2     
   ============================================
     Files          1915     1915              
     Lines         72267    72294      +27     
     Branches       7969     7971       +2     
   ============================================
   + Hits          33095    33103       +8     
   - Misses        36149    36166      +17     
   - Partials       3023     3025       +2
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
   | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.81% <0%> (ø)` | `12 <0> (ø)` | :arrow_down: |
   | [...e/gobblin/salesforce/SalesforceRecordIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVJlY29yZEl0ZXJhdG9yLmphdmE=) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [...lin/util/filesystem/FileSystemInstrumentation.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvZmlsZXN5c3RlbS9GaWxlU3lzdGVtSW5zdHJ1bWVudGF0aW9uLmphdmE=) | `85.71% <0%> (-7.15%)` | `3% <0%> (ø)` | |
   | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `64.81% <0%> (-0.47%)` | `28% <0%> (ø)` | |
   | [...lin/elasticsearch/writer/FutureCallbackHolder.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tZWxhc3RpY3NlYXJjaC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9lbGFzdGljc2VhcmNoL3dyaXRlci9GdXR1cmVDYWxsYmFja0hvbGRlci5qYXZh) | `62.85% <0%> (+1.42%)` | `4% <0%> (ø)` | :arrow_down: |
   | [...lin/restli/throttling/ZookeeperLeaderElection.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2UvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2Utc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3Jlc3RsaS90aHJvdHRsaW5nL1pvb2tlZXBlckxlYWRlckVsZWN0aW9uLmphdmE=) | `72.22% <0%> (+2.22%)` | `13% <0%> (ø)` | :arrow_down: |
   | [...in/java/org/apache/gobblin/cluster/HelixUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhVdGlscy5qYXZh) | `39.25% <0%> (+6.54%)` | `13% <0%> (+2%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=footer). Last update [c7e2c67...c2b7840](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372589295
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
 ##########
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
+
+  private int recordCount = 0;
+  private SalesforceExtractor extractor;
+  private String schema;
+  private String entity;
+  private WorkUnit workUnit;
+  private List<Predicate> predicateList;
+
+  private Iterator<JsonElement> queryResultIter;
+
+  public QueryResultIterator(
+      SalesforceExtractor extractor,
+      String schema,
+      String entity,
+      WorkUnit workUnit,
+      List<Predicate> predicateList
+  ) {
+    log.info("create query result iterator.");
+    this.extractor = extractor;
+    this.schema = schema;
+    this.entity = entity;
+    this.workUnit = workUnit;
+    this.predicateList = predicateList;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (queryResultIter == null) {
+      initQueryResultIter();
+    }
+    if (!queryResultIter.hasNext()) {
+      // no more data, print out total
+      log.info("Soft delete records total:{}", recordCount);
+    }
+    return queryResultIter.hasNext();
+  }
+
+  private void initQueryResultIter() {
+    try {
+      log.info("Pull soft delete records");
 
 Review comment:
   good catch! Fixed it. It is general.
   In our case we only use it for soft delete.
   QueryResultIterator is a iterate wrap for function RestApiExtractor.getRecordSet()
   getRecordSet was called 2 places:
   1. soft delete
   2. QueryBasedExtractor.getIterator function. (using in simple mode)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#issuecomment-574354876
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=h1) Report
   > Merging [#2868](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/66e201ceefad1b97fcad83b50f2954e48ef2d0f4?src=pr&el=desc) will **increase** coverage by `0.02%`.
   > The diff coverage is `0%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2868      +/-   ##
   ============================================
   + Coverage     45.79%   45.81%   +0.02%     
   - Complexity     9109     9111       +2     
   ============================================
     Files          1915     1918       +3     
     Lines         72293    72270      -23     
     Branches       7974     7959      -15     
   ============================================
   + Hits          33106    33113       +7     
   + Misses        36161    36133      -28     
   + Partials       3026     3024       -2
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.81% <0%> (ø)` | `12 <0> (ø)` | :arrow_down: |
   | [...apache/gobblin/salesforce/QueryResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUXVlcnlSZXN1bHRJdGVyYXRvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [.../apache/gobblin/salesforce/BulkResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvQnVsa1Jlc3VsdEl0ZXJhdG9yLmphdmE=) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [...che/gobblin/salesforce/ResultChainingIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUmVzdWx0Q2hhaW5pbmdJdGVyYXRvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
   | [...n/source/extractor/utils/InputStreamCSVReader.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvdXRpbHMvSW5wdXRTdHJlYW1DU1ZSZWFkZXIuamF2YQ==) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
   | [...n/java/org/apache/gobblin/salesforce/FileIdVO.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvRmlsZUlkVk8uamF2YQ==) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `64.22% <0%> (ø)` | `27% <0%> (-1%)` | :arrow_down: |
   | ... and [6 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=footer). Last update [66e201c...863c6d4](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372592294
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
 ##########
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
 
 Review comment:
   I changed the java doc
   
   * Iterator for rest api query
    * It is a wrapper of
    * RestApiExtractor.getRecordSet(schema, entity, workUnit, predicateList)
    * the reason why we want to a wrapper for the function is -
    * We want to delay the execution of the function. Only when the next get called, we fetch the data.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] asfgit closed pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r369747865
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##########
 @@ -563,92 +547,95 @@ public String getTimestampPredicateCondition(String column, long value, String v
     return dataTypeMap;
   }
 
-
   private Boolean isPkChunkingFetchDone = false;
 
-  private Iterator<JsonElement> getRecordSetPkChunking(WorkUnit workUnit) throws RuntimeException {
+  private Iterator<JsonElement> fetchRecordSetPkChunking(WorkUnit workUnit) {
     if (isPkChunkingFetchDone) {
       return null; // must return null to represent no more data.
     }
+    log.info("----Get records for pk-chunking----" + workUnit.getProp(PK_CHUNKING_JOB_ID));
     isPkChunkingFetchDone = true; // set to true, never come here twice.
+    bulkApiLogin();
+    String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
+    String batchIdResultIdPairString = workUnit.getProp(PK_CHUNKING_BATCH_RESULT_ID_PAIRS);
+    List<FileIdVO> fileIdList = this.parseBatchIdResultIdString(jobId, batchIdResultIdPairString);
+    return new ResultChainingIterator(bulkConnection, fileIdList, retryLimit);
+  }
+
+  private List<FileIdVO> parseBatchIdResultIdString(String jobId, String batchIdResultIdString) {
+    return Arrays.stream(batchIdResultIdString.split(","))
+        .map( x -> x.split(":")).map(x -> new FileIdVO(jobId, x[0], x[1]))
+        .collect(Collectors.toList());
+  }
 
+  private Boolean isBulkFetchDone = false;
+
+  private Iterator<JsonElement> fetchRecordSet(
+      String schema,
+      String entity,
+      WorkUnit workUnit,
+      List<Predicate> predicateList
+) {
+    if (isBulkFetchDone) {
+      return null; // need to return null to indicate no more data.
+    }
+    isBulkFetchDone = true;
+    log.info("----Get records for bulk batch job----");
     try {
-      if (!bulkApiLogin()) {
-        throw new IllegalArgumentException("Invalid Login");
-      }
+      // set finish status to false before starting the bulk job
+      this.setBulkJobFinished(false);
+      this.bulkResultIdList = getQueryResultIds(entity, predicateList);
+      log.info("Number of bulk api resultSet Ids:" + this.bulkResultIdList.size());
+      List<FileIdVO> fileIdVoList = this.bulkResultIdList.stream()
 
 Review comment:
   Is the `o` in fileIdVoList intentional or is it supposed to be a zero?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372592294
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
 ##########
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
 
 Review comment:
   I changed the java doc
   
   > Iterator for rest api query
   > It is a wrapper of
   > RestApiExtractor.getRecordSet(schema, entity, workUnit, predicateList)
   > the reason why we want to a wrapper for the function is -
   > We want to delay the execution of the function. Only when the next get called, we fetch the data.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372550130
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
 ##########
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
+
+  private int recordCount = 0;
+  private SalesforceExtractor extractor;
+  private String schema;
+  private String entity;
+  private WorkUnit workUnit;
+  private List<Predicate> predicateList;
+
+  private Iterator<JsonElement> queryResultIter;
+
+  public QueryResultIterator(
+      SalesforceExtractor extractor,
+      String schema,
+      String entity,
+      WorkUnit workUnit,
+      List<Predicate> predicateList
+  ) {
+    log.info("create query result iterator.");
+    this.extractor = extractor;
+    this.schema = schema;
+    this.entity = entity;
+    this.workUnit = workUnit;
+    this.predicateList = predicateList;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (queryResultIter == null) {
+      initQueryResultIter();
+    }
+    if (!queryResultIter.hasNext()) {
+      // no more data, print out total
+      log.info("Soft delete records total:{}", recordCount);
+    }
+    return queryResultIter.hasNext();
+  }
+
+  private void initQueryResultIter() {
+    try {
+      log.info("Pull soft delete records");
+      queryResultIter = extractor.getRecordSet(schema, entity, workUnit, predicateList);
+    } catch (DataRecordException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public JsonElement next() {
+    if (queryResultIter == null) {
+      initQueryResultIter();
+    }
+    recordCount ++;
+    if (!queryResultIter.hasNext()) {
+      // no more data, print out total
+      log.info("Soft delete records total:{}", recordCount);
 
 Review comment:
   Same here. How come this class mentions soft delete?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r369749915
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -0,0 +1,115 @@
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import com.sforce.async.BulkConnection;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+
+@Slf4j
+public class BulkResultIterator implements Iterator<JsonElement> {
+  private FileIdVO fileIdVO;
+  private int retryLimit;
+  private BulkConnection conn;
+  private InputStreamCSVReader csvReader;
+  private List<String> header;
+  private int columnSize;
+  private int lineCount = 0; // this is different than currentFileRowCount. cvs file has header
+  private List<String> preLoadedLine = null;
+
+  public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int retryLimit) {
+    log.info("create BulkResultIterator: " + fileIdVO);
+    this.conn = conn;
+    this.fileIdVO = fileIdVO;
+    this.retryLimit = retryLimit;
+  }
+
+  /**
+   * read first data record from cvsReader and initiate header
+   * not supposed to do it in constructor function, for delay creating file stream
+   */
+  private void initHeader() {
+    this.header = this.nextLine(); // first line is header
+    this.columnSize = this.header.size();
+    this.preLoadedLine = this.nextLine(); // initialize: buffer one record data
+  }
+
+  private List<String> nextLine() {
+    Exception exception = null;
+    for (int i = 0; i < retryLimit; i++) {
+      try {
+        if (this.csvReader == null) {
+          this.csvReader = openAndSeekCsvReader();
+        }
+        List<String> line = this.csvReader.nextRecord();
+        this.lineCount++;
+        return line;
+      } catch (InputStreamCSVReader.CSVParseException e) {
+        throw new RuntimeException(e); // don't retry if it is parse error
+      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
+        exception = e;
+        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = openAndSeekCsvReader();
+      }
+    }
+    throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", exception);
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (this.header == null) {
+      initHeader();
+    }
+    return this.preLoadedLine != null;
+  }
+
+  @Override
+  public JsonElement next() {
+    if (this.header == null) {
+      initHeader();
+    }
+    JsonElement jsonObject = Utils.csvToJsonObject(this.header, this.preLoadedLine, this.columnSize);
+    this.preLoadedLine = this.nextLine();
+    if (this.preLoadedLine == null) {
+      log.info("----Record count: [{}] for {}", lineCount - 1, fileIdVO);
+    }
+    return jsonObject;
+  }
+
+  private InputStreamCSVReader openAndSeekCsvReader() {
+    String jobId = fileIdVO.getJobId();
+    String batchId = fileIdVO.getBatchId();
+    String resultId = fileIdVO.getResultId();
+    log.info("Fetching [jobId={}, batchId={}, resultId={}]", jobId, batchId, resultId);
+    closeCsvReader();
+    try {
+      InputStream is = conn.getQueryResultStream(jobId, batchId, resultId);
+      BufferedReader br = new BufferedReader(new InputStreamReader(is, ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+      csvReader = new InputStreamCSVReader(br);
+      for (int j = 0; j < lineCount; j++) {
+        csvReader.nextRecord(); // skip these record
+      }
+      // failed to skip seekLineNumber lines, try to open the file again. same time count+1
+      return csvReader;
 
 Review comment:
   I don't see the sanity check that after skipping the position matches the prior position. This was the old logic.
   
         for (int i = 0; i < recordsToSkip; i++) {	
           lastCsvRecord = reader.nextRecord();	
         }	
   
         // make sure the last record processed before the error was the last record skipped so that the next	
         // unprocessed record is processed in the next call to fetchResultBatch()	
         if (recordsToSkip > 0) {	
           if (!this.csvRecord.equals(lastCsvRecord)) {	
             throw new RuntimeException("Repositioning after reconnecting did not point to the expected record");	
           }	
         }	
       }

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#issuecomment-574354876
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=h1) Report
   > Merging [#2868](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/357d1db84c4601b3e50f7d26a05d8ce282c5159d?src=pr&el=desc) will **decrease** coverage by `0.01%`.
   > The diff coverage is `0%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2868      +/-   ##
   ============================================
   - Coverage     45.75%   45.73%   -0.02%     
     Complexity     9103     9103              
   ============================================
     Files          1917     1917              
     Lines         72131    72158      +27     
     Branches       7956     7958       +2     
   ============================================
   + Hits          33003    33004       +1     
   - Misses        36102    36131      +29     
   + Partials       3026     3023       -3
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
   | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.81% <0%> (ø)` | `12 <0> (ø)` | :arrow_down: |
   | [...e/gobblin/salesforce/SalesforceRecordIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVJlY29yZEl0ZXJhdG9yLmphdmE=) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [...in/java/org/apache/gobblin/cluster/HelixUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhVdGlscy5qYXZh) | `32.71% <0%> (-2.81%)` | `11% <0%> (-1%)` | |
   | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `64.81% <0%> (-0.47%)` | `28% <0%> (ø)` | |
   | [.../apache/gobblin/runtime/api/JobExecutionState.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL0pvYkV4ZWN1dGlvblN0YXRlLmphdmE=) | `80.37% <0%> (+0.93%)` | `24% <0%> (ø)` | :arrow_down: |
   | [...lin/elasticsearch/writer/FutureCallbackHolder.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tZWxhc3RpY3NlYXJjaC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9lbGFzdGljc2VhcmNoL3dyaXRlci9GdXR1cmVDYWxsYmFja0hvbGRlci5qYXZh) | `62.85% <0%> (+1.42%)` | `4% <0%> (ø)` | :arrow_down: |
   | [...lin/restli/throttling/ZookeeperLeaderElection.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2UvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2Utc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3Jlc3RsaS90aHJvdHRsaW5nL1pvb2tlZXBlckxlYWRlckVsZWN0aW9uLmphdmE=) | `72.22% <0%> (+2.22%)` | `13% <0%> (ø)` | :arrow_down: |
   | [...lin/util/filesystem/FileSystemInstrumentation.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvZmlsZXN5c3RlbS9GaWxlU3lzdGVtSW5zdHJ1bWVudGF0aW9uLmphdmE=) | `92.85% <0%> (+7.14%)` | `3% <0%> (ø)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=footer). Last update [357d1db...501d178](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372548890
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
 ##########
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
+
+  private int recordCount = 0;
+  private SalesforceExtractor extractor;
+  private String schema;
+  private String entity;
+  private WorkUnit workUnit;
+  private List<Predicate> predicateList;
+
+  private Iterator<JsonElement> queryResultIter;
+
+  public QueryResultIterator(
+      SalesforceExtractor extractor,
+      String schema,
+      String entity,
+      WorkUnit workUnit,
+      List<Predicate> predicateList
+  ) {
+    log.info("create query result iterator.");
+    this.extractor = extractor;
+    this.schema = schema;
+    this.entity = entity;
+    this.workUnit = workUnit;
+    this.predicateList = predicateList;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (queryResultIter == null) {
+      initQueryResultIter();
+    }
+    if (!queryResultIter.hasNext()) {
+      // no more data, print out total
+      log.info("Soft delete records total:{}", recordCount);
+    }
+    return queryResultIter.hasNext();
+  }
+
+  private void initQueryResultIter() {
+    try {
+      log.info("Pull soft delete records");
 
 Review comment:
   Is this the right log message?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r369859942
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##########
 @@ -563,92 +547,95 @@ public String getTimestampPredicateCondition(String column, long value, String v
     return dataTypeMap;
   }
 
-
   private Boolean isPkChunkingFetchDone = false;
 
-  private Iterator<JsonElement> getRecordSetPkChunking(WorkUnit workUnit) throws RuntimeException {
+  private Iterator<JsonElement> fetchRecordSetPkChunking(WorkUnit workUnit) {
     if (isPkChunkingFetchDone) {
       return null; // must return null to represent no more data.
     }
+    log.info("----Get records for pk-chunking----" + workUnit.getProp(PK_CHUNKING_JOB_ID));
     isPkChunkingFetchDone = true; // set to true, never come here twice.
+    bulkApiLogin();
+    String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
+    String batchIdResultIdPairString = workUnit.getProp(PK_CHUNKING_BATCH_RESULT_ID_PAIRS);
+    List<FileIdVO> fileIdList = this.parseBatchIdResultIdString(jobId, batchIdResultIdPairString);
+    return new ResultChainingIterator(bulkConnection, fileIdList, retryLimit);
+  }
+
+  private List<FileIdVO> parseBatchIdResultIdString(String jobId, String batchIdResultIdString) {
+    return Arrays.stream(batchIdResultIdString.split(","))
+        .map( x -> x.split(":")).map(x -> new FileIdVO(jobId, x[0], x[1]))
+        .collect(Collectors.toList());
+  }
 
+  private Boolean isBulkFetchDone = false;
+
+  private Iterator<JsonElement> fetchRecordSet(
+      String schema,
+      String entity,
+      WorkUnit workUnit,
+      List<Predicate> predicateList
+) {
+    if (isBulkFetchDone) {
+      return null; // need to return null to indicate no more data.
+    }
+    isBulkFetchDone = true;
+    log.info("----Get records for bulk batch job----");
     try {
-      if (!bulkApiLogin()) {
-        throw new IllegalArgumentException("Invalid Login");
-      }
+      // set finish status to false before starting the bulk job
+      this.setBulkJobFinished(false);
+      this.bulkResultIdList = getQueryResultIds(entity, predicateList);
+      log.info("Number of bulk api resultSet Ids:" + this.bulkResultIdList.size());
+      List<FileIdVO> fileIdVoList = this.bulkResultIdList.stream()
 
 Review comment:
   FileIdVO: I was trying to say "Value Object". it is a plain object for data transfer.
   fileIdVoList: since fileIdVOList doesn't look smart, I made the O lower case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r369302945
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceRecordIterator.java
 ##########
 @@ -99,7 +103,40 @@ private void fulfillCurrentRecord() {
     }
   }
 
-  private void resetCurrentRecordStatus() {
+  private InputStreamCSVReader reopenStreamWithRetry(ResultStruct resultStruct, int seekLineNumber, int retryNumber, int count) {
 
 Review comment:
   Thanks! I did a refactor to make both pk-chunking and normal bulk batch using Iterator.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r367705299
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
 ##########
 @@ -26,12 +26,12 @@ private SalesforceConfigurationKeys() {
   public static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll";
 
   // pk-chunking
-  public static final String PK_CHUNKING_TEST_BATCH_ID_LIST = "salesforce.pkChunking.testBatchIdList";
-  public static final String PK_CHUNKING_TEST_JOB_ID = "salesforce.pkChunking.testJobId";
+  public static final String BULK_TEST_JOB_ID = "salesforce.bulk.test.job.id";
 
 Review comment:
   Thanks! Changed to 
   ```
   public static final String BULK_TEST_JOB_ID = "salesforce.bulk.testJobId";
   public static final String BULK_TEST_BATCH_ID_LIST = "salesforce.bulk.testBatchIds";
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372553737
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
 ##########
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
 
 Review comment:
   The name of the class and comments seem to indicate this is for fetching rest api query results in general, but the code below has logging specific to fetching "soft delete records".  If this class is specific to soft delete records, then that should be specified in the class name and comment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#issuecomment-574354876
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=h1) Report
   > Merging [#2868](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/66e201ceefad1b97fcad83b50f2954e48ef2d0f4?src=pr&el=desc) will **increase** coverage by `0.01%`.
   > The diff coverage is `0%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##             master   #2868      +/-   ##
   ===========================================
   + Coverage     45.79%   45.8%   +0.01%     
   + Complexity     9109    9108       -1     
   ===========================================
     Files          1915    1918       +3     
     Lines         72293   72271      -22     
     Branches       7974    7959      -15     
   ===========================================
     Hits          33106   33106              
   + Misses        36161   36140      -21     
   + Partials       3026    3025       -1
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.81% <0%> (ø)` | `12 <0> (ø)` | :arrow_down: |
   | [...apache/gobblin/salesforce/QueryResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUXVlcnlSZXN1bHRJdGVyYXRvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [.../apache/gobblin/salesforce/BulkResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvQnVsa1Jlc3VsdEl0ZXJhdG9yLmphdmE=) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [...che/gobblin/salesforce/ResultChainingIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUmVzdWx0Q2hhaW5pbmdJdGVyYXRvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
   | [...n/source/extractor/utils/InputStreamCSVReader.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvdXRpbHMvSW5wdXRTdHJlYW1DU1ZSZWFkZXIuamF2YQ==) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
   | [...n/java/org/apache/gobblin/salesforce/FileIdVO.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvRmlsZUlkVk8uamF2YQ==) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [...e/gobblin/runtime/locks/ZookeeperBasedJobLock.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9ja3MvWm9va2VlcGVyQmFzZWRKb2JMb2NrLmphdmE=) | `63.33% <0%> (-1.12%)` | `15% <0%> (-1%)` | |
   | [.../apache/gobblin/runtime/api/JobExecutionState.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL0pvYkV4ZWN1dGlvblN0YXRlLmphdmE=) | `79.43% <0%> (-0.94%)` | `24% <0%> (ø)` | |
   | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `64.22% <0%> (ø)` | `27% <0%> (-1%)` | :arrow_down: |
   | ... and [6 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=footer). Last update [66e201c...1181c37](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r367159350
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceRecordIterator.java
 ##########
 @@ -99,7 +103,40 @@ private void fulfillCurrentRecord() {
     }
   }
 
-  private void resetCurrentRecordStatus() {
+  private InputStreamCSVReader reopenStreamWithRetry(ResultStruct resultStruct, int seekLineNumber, int retryNumber, int count) {
 
 Review comment:
   Can you refactor the logic in `SalesforceExtractor.reinitializeBufferedReader` and use it here? That has an additional validation to make sure that after skipping we arrived at the expected location based on the last record read before the failure.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r369857200
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -0,0 +1,115 @@
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import com.sforce.async.BulkConnection;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+
+@Slf4j
+public class BulkResultIterator implements Iterator<JsonElement> {
+  private FileIdVO fileIdVO;
+  private int retryLimit;
+  private BulkConnection conn;
+  private InputStreamCSVReader csvReader;
+  private List<String> header;
+  private int columnSize;
+  private int lineCount = 0; // this is different than currentFileRowCount. cvs file has header
+  private List<String> preLoadedLine = null;
+
+  public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int retryLimit) {
+    log.info("create BulkResultIterator: " + fileIdVO);
+    this.conn = conn;
+    this.fileIdVO = fileIdVO;
+    this.retryLimit = retryLimit;
+  }
+
+  /**
+   * read first data record from cvsReader and initiate header
+   * not supposed to do it in constructor function, for delay creating file stream
+   */
+  private void initHeader() {
+    this.header = this.nextLine(); // first line is header
+    this.columnSize = this.header.size();
+    this.preLoadedLine = this.nextLine(); // initialize: buffer one record data
+  }
+
+  private List<String> nextLine() {
+    Exception exception = null;
+    for (int i = 0; i < retryLimit; i++) {
+      try {
+        if (this.csvReader == null) {
+          this.csvReader = openAndSeekCsvReader();
+        }
+        List<String> line = this.csvReader.nextRecord();
+        this.lineCount++;
+        return line;
+      } catch (InputStreamCSVReader.CSVParseException e) {
+        throw new RuntimeException(e); // don't retry if it is parse error
+      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
+        exception = e;
+        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = openAndSeekCsvReader();
+      }
+    }
+    throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", exception);
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (this.header == null) {
+      initHeader();
+    }
+    return this.preLoadedLine != null;
+  }
+
+  @Override
+  public JsonElement next() {
+    if (this.header == null) {
+      initHeader();
+    }
+    JsonElement jsonObject = Utils.csvToJsonObject(this.header, this.preLoadedLine, this.columnSize);
+    this.preLoadedLine = this.nextLine();
+    if (this.preLoadedLine == null) {
+      log.info("----Record count: [{}] for {}", lineCount - 1, fileIdVO);
+    }
+    return jsonObject;
+  }
+
+  private InputStreamCSVReader openAndSeekCsvReader() {
+    String jobId = fileIdVO.getJobId();
+    String batchId = fileIdVO.getBatchId();
+    String resultId = fileIdVO.getResultId();
+    log.info("Fetching [jobId={}, batchId={}, resultId={}]", jobId, batchId, resultId);
+    closeCsvReader();
+    try {
+      InputStream is = conn.getQueryResultStream(jobId, batchId, resultId);
+      BufferedReader br = new BufferedReader(new InputStreamReader(is, ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+      csvReader = new InputStreamCSVReader(br);
+      for (int j = 0; j < lineCount; j++) {
+        csvReader.nextRecord(); // skip these record
+      }
+      // failed to skip seekLineNumber lines, try to open the file again. same time count+1
+      return csvReader;
 
 Review comment:
   Added it back. test - https://ltx1-holdemaz05.grid.linkedin.com:8443/executor?execid=22307199&job=salesforce_task_full&attempt=0

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#issuecomment-574354876
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=h1) Report
   > Merging [#2868](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/66e201ceefad1b97fcad83b50f2954e48ef2d0f4?src=pr&el=desc) will **decrease** coverage by `41.69%`.
   > The diff coverage is `0%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##             master   #2868      +/-   ##
   ===========================================
   - Coverage     45.79%   4.09%   -41.7%     
   + Complexity     9109     748    -8361     
   ===========================================
     Files          1915    1918       +3     
     Lines         72293   72270      -23     
     Branches       7974    7959      -15     
   ===========================================
   - Hits          33106    2962   -30144     
   - Misses        36161   68990   +32829     
   + Partials       3026     318    -2708
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `0% <0%> (-19.82%)` | `0 <0> (-12)` | |
   | [...apache/gobblin/salesforce/QueryResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUXVlcnlSZXN1bHRJdGVyYXRvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [.../apache/gobblin/salesforce/BulkResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvQnVsa1Jlc3VsdEl0ZXJhdG9yLmphdmE=) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [...che/gobblin/salesforce/ResultChainingIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUmVzdWx0Q2hhaW5pbmdJdGVyYXRvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
   | [...n/source/extractor/utils/InputStreamCSVReader.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvdXRpbHMvSW5wdXRTdHJlYW1DU1ZSZWFkZXIuamF2YQ==) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
   | [...n/java/org/apache/gobblin/salesforce/FileIdVO.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvRmlsZUlkVk8uamF2YQ==) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [...n/converter/AvroStringFieldDecryptorConverter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY3J5cHRvLXByb3ZpZGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbnZlcnRlci9BdnJvU3RyaW5nRmllbGREZWNyeXB0b3JDb252ZXJ0ZXIuamF2YQ==) | `0% <0%> (-100%)` | `0% <0%> (-2%)` | |
   | [...he/gobblin/cluster/TaskRunnerSuiteThreadModel.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvVGFza1J1bm5lclN1aXRlVGhyZWFkTW9kZWwuamF2YQ==) | `0% <0%> (-100%)` | `0% <0%> (-5%)` | |
   | [...n/mapreduce/avro/AvroKeyCompactorOutputFormat.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL2F2cm8vQXZyb0tleUNvbXBhY3Rvck91dHB1dEZvcm1hdC5qYXZh) | `0% <0%> (-100%)` | `0% <0%> (-3%)` | |
   | ... and [1119 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=footer). Last update [66e201c...863c6d4](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] codecov-io commented on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#issuecomment-574354876
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=h1) Report
   > Merging [#2868](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/357d1db84c4601b3e50f7d26a05d8ce282c5159d?src=pr&el=desc) will **decrease** coverage by `41.65%`.
   > The diff coverage is `0%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2868       +/-   ##
   ============================================
   - Coverage     45.75%    4.1%   -41.66%     
   + Complexity     9103     747     -8356     
   ============================================
     Files          1917    1917               
     Lines         72131   72158       +27     
     Branches       7956    7958        +2     
   ============================================
   - Hits          33003    2961    -30042     
   - Misses        36102   68878    +32776     
   + Partials       3026     319     -2707
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
   | [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `0% <0%> (-19.82%)` | `0 <0> (-12)` | |
   | [...e/gobblin/salesforce/SalesforceRecordIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVJlY29yZEl0ZXJhdG9yLmphdmE=) | `0% <0%> (ø)` | `0 <0> (?)` | |
   | [...n/converter/AvroStringFieldDecryptorConverter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY3J5cHRvLXByb3ZpZGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbnZlcnRlci9BdnJvU3RyaW5nRmllbGREZWNyeXB0b3JDb252ZXJ0ZXIuamF2YQ==) | `0% <0%> (-100%)` | `0% <0%> (-2%)` | |
   | [...he/gobblin/cluster/TaskRunnerSuiteThreadModel.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvVGFza1J1bm5lclN1aXRlVGhyZWFkTW9kZWwuamF2YQ==) | `0% <0%> (-100%)` | `0% <0%> (-5%)` | |
   | [...n/mapreduce/avro/AvroKeyCompactorOutputFormat.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL2F2cm8vQXZyb0tleUNvbXBhY3Rvck91dHB1dEZvcm1hdC5qYXZh) | `0% <0%> (-100%)` | `0% <0%> (-3%)` | |
   | [...apache/gobblin/fork/CopyNotSupportedException.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZm9yay9Db3B5Tm90U3VwcG9ydGVkRXhjZXB0aW9uLmphdmE=) | `0% <0%> (-100%)` | `0% <0%> (-1%)` | |
   | [.../gobblin/kafka/writer/KafkaWriterCommonConfig.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2thZmthL3dyaXRlci9LYWZrYVdyaXRlckNvbW1vbkNvbmZpZy5qYXZh) | `0% <0%> (-100%)` | `0% <0%> (-7%)` | |
   | [...ker/task/TaskLevelPolicyCheckerBuilderFactory.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3F1YWxpdHljaGVja2VyL3Rhc2svVGFza0xldmVsUG9saWN5Q2hlY2tlckJ1aWxkZXJGYWN0b3J5LmphdmE=) | `0% <0%> (-100%)` | `0% <0%> (-2%)` | |
   | [...bblin/data/management/copy/AllEqualComparator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvQWxsRXF1YWxDb21wYXJhdG9yLmphdmE=) | `0% <0%> (-100%)` | `0% <0%> (-2%)` | |
   | ... and [1113 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=footer). Last update [357d1db...501d178](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator

Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r367155043
 
 

 ##########
 File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
 ##########
 @@ -26,12 +26,12 @@ private SalesforceConfigurationKeys() {
   public static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll";
 
   // pk-chunking
-  public static final String PK_CHUNKING_TEST_BATCH_ID_LIST = "salesforce.pkChunking.testBatchIdList";
-  public static final String PK_CHUNKING_TEST_JOB_ID = "salesforce.pkChunking.testJobId";
+  public static final String BULK_TEST_JOB_ID = "salesforce.bulk.test.job.id";
 
 Review comment:
   Please follow the config naming convention of using '.' as namespace separators, so this should be "salesforce.bulk.testJobId". Change the other new keys to conform.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services