You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/01/14 19:49:07 UTC
[GitHub] [incubator-gobblin] arekusuri opened a new pull request #2868:
GOBBLIN-1025: Add retry for PK-Chuking iterator
arekusuri opened a new pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
### JIRA
https://issues.apache.org/jira/browse/GOBBLIN-1025
### Description
In SFDC connector, there is a class called `ResultIterator` (I will change the name to SalesforceRecordIterator).
It was using by only PK-Chunking currently. It encapsulated fetching a list of result files to a record iterator.
However, the csvReader.nextRecord() may throw out network IO exception. We should do retry in this case.
When a result file is fetched partly and one network IO exception happens, we are in a special situation - first half of the file is already fetched to our local, but another half of the file is still on datasource.
We need to
1. reopen the file stream
2. skip all the records that we already fetched, seek the cursor to the record which we haven't fetched yet.
### Tests
https://ltx1-holdemaz05.grid.linkedin.com:8443/executor?execid=21956300&job=salesforce_task_full&attempt=0
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372591935
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
+
+ private int recordCount = 0;
+ private SalesforceExtractor extractor;
+ private String schema;
+ private String entity;
+ private WorkUnit workUnit;
+ private List<Predicate> predicateList;
+
+ private Iterator<JsonElement> queryResultIter;
+
+ public QueryResultIterator(
+ SalesforceExtractor extractor,
+ String schema,
+ String entity,
+ WorkUnit workUnit,
+ List<Predicate> predicateList
+ ) {
+ log.info("create query result iterator.");
+ this.extractor = extractor;
+ this.schema = schema;
+ this.entity = entity;
+ this.workUnit = workUnit;
+ this.predicateList = predicateList;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (queryResultIter == null) {
+ initQueryResultIter();
+ }
+ if (!queryResultIter.hasNext()) {
+ // no more data, print out total
+ log.info("Soft delete records total:{}", recordCount);
+ }
+ return queryResultIter.hasNext();
+ }
+
+ private void initQueryResultIter() {
+ try {
+ log.info("Pull soft delete records");
+ queryResultIter = extractor.getRecordSet(schema, entity, workUnit, predicateList);
+ } catch (DataRecordException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public JsonElement next() {
+ if (queryResultIter == null) {
+ initQueryResultIter();
+ }
+ recordCount ++;
+ if (!queryResultIter.hasNext()) {
+ // no more data, print out total
+ log.info("Soft delete records total:{}", recordCount);
Review comment:
fixed. thanks
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] htran1 commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372554615
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.common.collect.Iterators;
+import com.google.gson.JsonElement;
+import com.sforce.async.BulkConnection;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Result Iterator.
Review comment:
Please have a less generic comment.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372595268
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.common.collect.Iterators;
+import com.google.gson.JsonElement;
+import com.sforce.async.BulkConnection;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Result Iterator.
Review comment:
fixed, thanks!
> * The Iterator to chain all result iterators together.
> * It is to create only one iterator for a list of result files of BulkAPI.
> * Same time it can also be able to add other iterator with function `add` to combine to 1 iterator
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] codecov-io edited a comment on issue #2868:
GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#issuecomment-574354876
# [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=h1) Report
> Merging [#2868](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/c7e2c670734ea1009dace621ecb137219476ce29?src=pr&el=desc) will **decrease** coverage by `<.01%`.
> The diff coverage is `0%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2868 +/- ##
============================================
- Coverage 45.79% 45.78% -0.01%
- Complexity 9108 9110 +2
============================================
Files 1915 1915
Lines 72267 72294 +27
Branches 7969 7971 +2
============================================
+ Hits 33095 33103 +8
- Misses 36149 36166 +17
- Partials 3023 3025 +2
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
| [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.81% <0%> (ø)` | `12 <0> (ø)` | :arrow_down: |
| [...e/gobblin/salesforce/SalesforceRecordIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVJlY29yZEl0ZXJhdG9yLmphdmE=) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [...lin/util/filesystem/FileSystemInstrumentation.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvZmlsZXN5c3RlbS9GaWxlU3lzdGVtSW5zdHJ1bWVudGF0aW9uLmphdmE=) | `85.71% <0%> (-7.15%)` | `3% <0%> (ø)` | |
| [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `64.81% <0%> (-0.47%)` | `28% <0%> (ø)` | |
| [...lin/elasticsearch/writer/FutureCallbackHolder.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tZWxhc3RpY3NlYXJjaC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9lbGFzdGljc2VhcmNoL3dyaXRlci9GdXR1cmVDYWxsYmFja0hvbGRlci5qYXZh) | `62.85% <0%> (+1.42%)` | `4% <0%> (ø)` | :arrow_down: |
| [...lin/restli/throttling/ZookeeperLeaderElection.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2UvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2Utc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3Jlc3RsaS90aHJvdHRsaW5nL1pvb2tlZXBlckxlYWRlckVsZWN0aW9uLmphdmE=) | `72.22% <0%> (+2.22%)` | `13% <0%> (ø)` | :arrow_down: |
| [...in/java/org/apache/gobblin/cluster/HelixUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhVdGlscy5qYXZh) | `39.25% <0%> (+6.54%)` | `13% <0%> (+2%)` | :arrow_up: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=footer). Last update [c7e2c67...c2b7840](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372589295
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
+
+ private int recordCount = 0;
+ private SalesforceExtractor extractor;
+ private String schema;
+ private String entity;
+ private WorkUnit workUnit;
+ private List<Predicate> predicateList;
+
+ private Iterator<JsonElement> queryResultIter;
+
+ public QueryResultIterator(
+ SalesforceExtractor extractor,
+ String schema,
+ String entity,
+ WorkUnit workUnit,
+ List<Predicate> predicateList
+ ) {
+ log.info("create query result iterator.");
+ this.extractor = extractor;
+ this.schema = schema;
+ this.entity = entity;
+ this.workUnit = workUnit;
+ this.predicateList = predicateList;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (queryResultIter == null) {
+ initQueryResultIter();
+ }
+ if (!queryResultIter.hasNext()) {
+ // no more data, print out total
+ log.info("Soft delete records total:{}", recordCount);
+ }
+ return queryResultIter.hasNext();
+ }
+
+ private void initQueryResultIter() {
+ try {
+ log.info("Pull soft delete records");
Review comment:
good catch! Fixed it. It is general.
In our case we only use it for soft delete.
QueryResultIterator is a iterate wrap for function RestApiExtractor.getRecordSet()
getRecordSet was called 2 places:
1. soft delete
2. QueryBasedExtractor.getIterator function. (using in simple mode)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] codecov-io edited a comment on issue #2868:
GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#issuecomment-574354876
# [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=h1) Report
> Merging [#2868](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/66e201ceefad1b97fcad83b50f2954e48ef2d0f4?src=pr&el=desc) will **increase** coverage by `0.02%`.
> The diff coverage is `0%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2868 +/- ##
============================================
+ Coverage 45.79% 45.81% +0.02%
- Complexity 9109 9111 +2
============================================
Files 1915 1918 +3
Lines 72293 72270 -23
Branches 7974 7959 -15
============================================
+ Hits 33106 33113 +7
+ Misses 36161 36133 -28
+ Partials 3026 3024 -2
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.81% <0%> (ø)` | `12 <0> (ø)` | :arrow_down: |
| [...apache/gobblin/salesforce/QueryResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUXVlcnlSZXN1bHRJdGVyYXRvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [.../apache/gobblin/salesforce/BulkResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvQnVsa1Jlc3VsdEl0ZXJhdG9yLmphdmE=) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [...che/gobblin/salesforce/ResultChainingIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUmVzdWx0Q2hhaW5pbmdJdGVyYXRvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
| [...n/source/extractor/utils/InputStreamCSVReader.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvdXRpbHMvSW5wdXRTdHJlYW1DU1ZSZWFkZXIuamF2YQ==) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
| [...n/java/org/apache/gobblin/salesforce/FileIdVO.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvRmlsZUlkVk8uamF2YQ==) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `64.22% <0%> (ø)` | `27% <0%> (-1%)` | :arrow_down: |
| ... and [6 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=footer). Last update [66e201c...863c6d4](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372592294
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
Review comment:
I changed the java doc
* Iterator for rest api query
* It is a wrapper of
* RestApiExtractor.getRecordSet(schema, entity, workUnit, predicateList)
* the reason why we want to a wrapper for the function is -
* We want to delay the execution of the function. Only when the next get called, we fetch the data.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] asfgit closed pull request #2868: GOBBLIN-1025:
Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] htran1 commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r369747865
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
##########
@@ -563,92 +547,95 @@ public String getTimestampPredicateCondition(String column, long value, String v
return dataTypeMap;
}
-
private Boolean isPkChunkingFetchDone = false;
- private Iterator<JsonElement> getRecordSetPkChunking(WorkUnit workUnit) throws RuntimeException {
+ private Iterator<JsonElement> fetchRecordSetPkChunking(WorkUnit workUnit) {
if (isPkChunkingFetchDone) {
return null; // must return null to represent no more data.
}
+ log.info("----Get records for pk-chunking----" + workUnit.getProp(PK_CHUNKING_JOB_ID));
isPkChunkingFetchDone = true; // set to true, never come here twice.
+ bulkApiLogin();
+ String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
+ String batchIdResultIdPairString = workUnit.getProp(PK_CHUNKING_BATCH_RESULT_ID_PAIRS);
+ List<FileIdVO> fileIdList = this.parseBatchIdResultIdString(jobId, batchIdResultIdPairString);
+ return new ResultChainingIterator(bulkConnection, fileIdList, retryLimit);
+ }
+
+ private List<FileIdVO> parseBatchIdResultIdString(String jobId, String batchIdResultIdString) {
+ return Arrays.stream(batchIdResultIdString.split(","))
+ .map( x -> x.split(":")).map(x -> new FileIdVO(jobId, x[0], x[1]))
+ .collect(Collectors.toList());
+ }
+ private Boolean isBulkFetchDone = false;
+
+ private Iterator<JsonElement> fetchRecordSet(
+ String schema,
+ String entity,
+ WorkUnit workUnit,
+ List<Predicate> predicateList
+) {
+ if (isBulkFetchDone) {
+ return null; // need to return null to indicate no more data.
+ }
+ isBulkFetchDone = true;
+ log.info("----Get records for bulk batch job----");
try {
- if (!bulkApiLogin()) {
- throw new IllegalArgumentException("Invalid Login");
- }
+ // set finish status to false before starting the bulk job
+ this.setBulkJobFinished(false);
+ this.bulkResultIdList = getQueryResultIds(entity, predicateList);
+ log.info("Number of bulk api resultSet Ids:" + this.bulkResultIdList.size());
+ List<FileIdVO> fileIdVoList = this.bulkResultIdList.stream()
Review comment:
Is the `o` in fileIdVoList intentional or is it supposed to be a zero?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372592294
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
Review comment:
I changed the java doc
> Iterator for rest api query
> It is a wrapper of
> RestApiExtractor.getRecordSet(schema, entity, workUnit, predicateList)
> the reason why we want to a wrapper for the function is -
> We want to delay the execution of the function. Only when the next get called, we fetch the data.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] htran1 commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372550130
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
+
+ private int recordCount = 0;
+ private SalesforceExtractor extractor;
+ private String schema;
+ private String entity;
+ private WorkUnit workUnit;
+ private List<Predicate> predicateList;
+
+ private Iterator<JsonElement> queryResultIter;
+
+ public QueryResultIterator(
+ SalesforceExtractor extractor,
+ String schema,
+ String entity,
+ WorkUnit workUnit,
+ List<Predicate> predicateList
+ ) {
+ log.info("create query result iterator.");
+ this.extractor = extractor;
+ this.schema = schema;
+ this.entity = entity;
+ this.workUnit = workUnit;
+ this.predicateList = predicateList;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (queryResultIter == null) {
+ initQueryResultIter();
+ }
+ if (!queryResultIter.hasNext()) {
+ // no more data, print out total
+ log.info("Soft delete records total:{}", recordCount);
+ }
+ return queryResultIter.hasNext();
+ }
+
+ private void initQueryResultIter() {
+ try {
+ log.info("Pull soft delete records");
+ queryResultIter = extractor.getRecordSet(schema, entity, workUnit, predicateList);
+ } catch (DataRecordException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public JsonElement next() {
+ if (queryResultIter == null) {
+ initQueryResultIter();
+ }
+ recordCount ++;
+ if (!queryResultIter.hasNext()) {
+ // no more data, print out total
+ log.info("Soft delete records total:{}", recordCount);
Review comment:
Same here. How come this class mentions soft delete?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] htran1 commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r369749915
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
##########
@@ -0,0 +1,115 @@
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import com.sforce.async.BulkConnection;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+
+@Slf4j
+public class BulkResultIterator implements Iterator<JsonElement> {
+ private FileIdVO fileIdVO;
+ private int retryLimit;
+ private BulkConnection conn;
+ private InputStreamCSVReader csvReader;
+ private List<String> header;
+ private int columnSize;
+ private int lineCount = 0; // this is different than currentFileRowCount. cvs file has header
+ private List<String> preLoadedLine = null;
+
+ public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int retryLimit) {
+ log.info("create BulkResultIterator: " + fileIdVO);
+ this.conn = conn;
+ this.fileIdVO = fileIdVO;
+ this.retryLimit = retryLimit;
+ }
+
+ /**
+ * read first data record from cvsReader and initiate header
+ * not supposed to do it in constructor function, for delay creating file stream
+ */
+ private void initHeader() {
+ this.header = this.nextLine(); // first line is header
+ this.columnSize = this.header.size();
+ this.preLoadedLine = this.nextLine(); // initialize: buffer one record data
+ }
+
+ private List<String> nextLine() {
+ Exception exception = null;
+ for (int i = 0; i < retryLimit; i++) {
+ try {
+ if (this.csvReader == null) {
+ this.csvReader = openAndSeekCsvReader();
+ }
+ List<String> line = this.csvReader.nextRecord();
+ this.lineCount++;
+ return line;
+ } catch (InputStreamCSVReader.CSVParseException e) {
+ throw new RuntimeException(e); // don't retry if it is parse error
+ } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
+ exception = e;
+ log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
+ this.csvReader = openAndSeekCsvReader();
+ }
+ }
+ throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", exception);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (this.header == null) {
+ initHeader();
+ }
+ return this.preLoadedLine != null;
+ }
+
+ @Override
+ public JsonElement next() {
+ if (this.header == null) {
+ initHeader();
+ }
+ JsonElement jsonObject = Utils.csvToJsonObject(this.header, this.preLoadedLine, this.columnSize);
+ this.preLoadedLine = this.nextLine();
+ if (this.preLoadedLine == null) {
+ log.info("----Record count: [{}] for {}", lineCount - 1, fileIdVO);
+ }
+ return jsonObject;
+ }
+
+ private InputStreamCSVReader openAndSeekCsvReader() {
+ String jobId = fileIdVO.getJobId();
+ String batchId = fileIdVO.getBatchId();
+ String resultId = fileIdVO.getResultId();
+ log.info("Fetching [jobId={}, batchId={}, resultId={}]", jobId, batchId, resultId);
+ closeCsvReader();
+ try {
+ InputStream is = conn.getQueryResultStream(jobId, batchId, resultId);
+ BufferedReader br = new BufferedReader(new InputStreamReader(is, ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+ csvReader = new InputStreamCSVReader(br);
+ for (int j = 0; j < lineCount; j++) {
+ csvReader.nextRecord(); // skip these record
+ }
+ // failed to skip seekLineNumber lines, try to open the file again. same time count+1
+ return csvReader;
Review comment:
I don't see the sanity check that after skipping the position matches the prior position. This was the old logic.
for (int i = 0; i < recordsToSkip; i++) {
lastCsvRecord = reader.nextRecord();
}
// make sure the last record processed before the error was the last record skipped so that the next
// unprocessed record is processed in the next call to fetchResultBatch()
if (recordsToSkip > 0) {
if (!this.csvRecord.equals(lastCsvRecord)) {
throw new RuntimeException("Repositioning after reconnecting did not point to the expected record");
}
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] codecov-io edited a comment on issue #2868:
GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#issuecomment-574354876
# [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=h1) Report
> Merging [#2868](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/357d1db84c4601b3e50f7d26a05d8ce282c5159d?src=pr&el=desc) will **decrease** coverage by `0.01%`.
> The diff coverage is `0%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2868 +/- ##
============================================
- Coverage 45.75% 45.73% -0.02%
Complexity 9103 9103
============================================
Files 1917 1917
Lines 72131 72158 +27
Branches 7956 7958 +2
============================================
+ Hits 33003 33004 +1
- Misses 36102 36131 +29
+ Partials 3026 3023 -3
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
| [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.81% <0%> (ø)` | `12 <0> (ø)` | :arrow_down: |
| [...e/gobblin/salesforce/SalesforceRecordIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVJlY29yZEl0ZXJhdG9yLmphdmE=) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [...in/java/org/apache/gobblin/cluster/HelixUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhVdGlscy5qYXZh) | `32.71% <0%> (-2.81%)` | `11% <0%> (-1%)` | |
| [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `64.81% <0%> (-0.47%)` | `28% <0%> (ø)` | |
| [.../apache/gobblin/runtime/api/JobExecutionState.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL0pvYkV4ZWN1dGlvblN0YXRlLmphdmE=) | `80.37% <0%> (+0.93%)` | `24% <0%> (ø)` | :arrow_down: |
| [...lin/elasticsearch/writer/FutureCallbackHolder.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tZWxhc3RpY3NlYXJjaC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9lbGFzdGljc2VhcmNoL3dyaXRlci9GdXR1cmVDYWxsYmFja0hvbGRlci5qYXZh) | `62.85% <0%> (+1.42%)` | `4% <0%> (ø)` | :arrow_down: |
| [...lin/restli/throttling/ZookeeperLeaderElection.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2UvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2Utc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3Jlc3RsaS90aHJvdHRsaW5nL1pvb2tlZXBlckxlYWRlckVsZWN0aW9uLmphdmE=) | `72.22% <0%> (+2.22%)` | `13% <0%> (ø)` | :arrow_down: |
| [...lin/util/filesystem/FileSystemInstrumentation.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvZmlsZXN5c3RlbS9GaWxlU3lzdGVtSW5zdHJ1bWVudGF0aW9uLmphdmE=) | `92.85% <0%> (+7.14%)` | `3% <0%> (ø)` | :arrow_down: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=footer). Last update [357d1db...501d178](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] htran1 commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372548890
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
+
+ private int recordCount = 0;
+ private SalesforceExtractor extractor;
+ private String schema;
+ private String entity;
+ private WorkUnit workUnit;
+ private List<Predicate> predicateList;
+
+ private Iterator<JsonElement> queryResultIter;
+
+ public QueryResultIterator(
+ SalesforceExtractor extractor,
+ String schema,
+ String entity,
+ WorkUnit workUnit,
+ List<Predicate> predicateList
+ ) {
+ log.info("create query result iterator.");
+ this.extractor = extractor;
+ this.schema = schema;
+ this.entity = entity;
+ this.workUnit = workUnit;
+ this.predicateList = predicateList;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (queryResultIter == null) {
+ initQueryResultIter();
+ }
+ if (!queryResultIter.hasNext()) {
+ // no more data, print out total
+ log.info("Soft delete records total:{}", recordCount);
+ }
+ return queryResultIter.hasNext();
+ }
+
+ private void initQueryResultIter() {
+ try {
+ log.info("Pull soft delete records");
Review comment:
Is this the right log message?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r369859942
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
##########
@@ -563,92 +547,95 @@ public String getTimestampPredicateCondition(String column, long value, String v
return dataTypeMap;
}
-
private Boolean isPkChunkingFetchDone = false;
- private Iterator<JsonElement> getRecordSetPkChunking(WorkUnit workUnit) throws RuntimeException {
+ private Iterator<JsonElement> fetchRecordSetPkChunking(WorkUnit workUnit) {
if (isPkChunkingFetchDone) {
return null; // must return null to represent no more data.
}
+ log.info("----Get records for pk-chunking----" + workUnit.getProp(PK_CHUNKING_JOB_ID));
isPkChunkingFetchDone = true; // set to true, never come here twice.
+ bulkApiLogin();
+ String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
+ String batchIdResultIdPairString = workUnit.getProp(PK_CHUNKING_BATCH_RESULT_ID_PAIRS);
+ List<FileIdVO> fileIdList = this.parseBatchIdResultIdString(jobId, batchIdResultIdPairString);
+ return new ResultChainingIterator(bulkConnection, fileIdList, retryLimit);
+ }
+
+ private List<FileIdVO> parseBatchIdResultIdString(String jobId, String batchIdResultIdString) {
+ return Arrays.stream(batchIdResultIdString.split(","))
+ .map( x -> x.split(":")).map(x -> new FileIdVO(jobId, x[0], x[1]))
+ .collect(Collectors.toList());
+ }
+ private Boolean isBulkFetchDone = false;
+
+ private Iterator<JsonElement> fetchRecordSet(
+ String schema,
+ String entity,
+ WorkUnit workUnit,
+ List<Predicate> predicateList
+) {
+ if (isBulkFetchDone) {
+ return null; // need to return null to indicate no more data.
+ }
+ isBulkFetchDone = true;
+ log.info("----Get records for bulk batch job----");
try {
- if (!bulkApiLogin()) {
- throw new IllegalArgumentException("Invalid Login");
- }
+ // set finish status to false before starting the bulk job
+ this.setBulkJobFinished(false);
+ this.bulkResultIdList = getQueryResultIds(entity, predicateList);
+ log.info("Number of bulk api resultSet Ids:" + this.bulkResultIdList.size());
+ List<FileIdVO> fileIdVoList = this.bulkResultIdList.stream()
Review comment:
FileIdVO: I was trying to say "Value Object". it is a plain object for data transfer.
fileIdVoList: since fileIdVOList doesn't look smart, I made the O lower case.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r369302945
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceRecordIterator.java
##########
@@ -99,7 +103,40 @@ private void fulfillCurrentRecord() {
}
}
- private void resetCurrentRecordStatus() {
+ private InputStreamCSVReader reopenStreamWithRetry(ResultStruct resultStruct, int seekLineNumber, int retryNumber, int count) {
Review comment:
Thanks! I did a refactor to make both pk-chunking and normal bulk batch using Iterator.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r367705299
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
##########
@@ -26,12 +26,12 @@ private SalesforceConfigurationKeys() {
public static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll";
// pk-chunking
- public static final String PK_CHUNKING_TEST_BATCH_ID_LIST = "salesforce.pkChunking.testBatchIdList";
- public static final String PK_CHUNKING_TEST_JOB_ID = "salesforce.pkChunking.testJobId";
+ public static final String BULK_TEST_JOB_ID = "salesforce.bulk.test.job.id";
Review comment:
Thanks! Changed to
```
public static final String BULK_TEST_JOB_ID = "salesforce.bulk.testJobId";
public static final String BULK_TEST_BATCH_ID_LIST = "salesforce.bulk.testBatchIds";
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] htran1 commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r372553737
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
Review comment:
The name of the class and comments seem to indicate this is for fetching rest api query results in general, but the code below has logging specific to fetching "soft delete records". If this class is specific to soft delete records, then that should be specified in the class name and comment.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] codecov-io edited a comment on issue #2868:
GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#issuecomment-574354876
# [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=h1) Report
> Merging [#2868](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/66e201ceefad1b97fcad83b50f2954e48ef2d0f4?src=pr&el=desc) will **increase** coverage by `0.01%`.
> The diff coverage is `0%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2868 +/- ##
===========================================
+ Coverage 45.79% 45.8% +0.01%
+ Complexity 9109 9108 -1
===========================================
Files 1915 1918 +3
Lines 72293 72271 -22
Branches 7974 7959 -15
===========================================
Hits 33106 33106
+ Misses 36161 36140 -21
+ Partials 3026 3025 -1
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `19.81% <0%> (ø)` | `12 <0> (ø)` | :arrow_down: |
| [...apache/gobblin/salesforce/QueryResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUXVlcnlSZXN1bHRJdGVyYXRvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [.../apache/gobblin/salesforce/BulkResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvQnVsa1Jlc3VsdEl0ZXJhdG9yLmphdmE=) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [...che/gobblin/salesforce/ResultChainingIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUmVzdWx0Q2hhaW5pbmdJdGVyYXRvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
| [...n/source/extractor/utils/InputStreamCSVReader.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvdXRpbHMvSW5wdXRTdHJlYW1DU1ZSZWFkZXIuamF2YQ==) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
| [...n/java/org/apache/gobblin/salesforce/FileIdVO.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvRmlsZUlkVk8uamF2YQ==) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [...e/gobblin/runtime/locks/ZookeeperBasedJobLock.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbG9ja3MvWm9va2VlcGVyQmFzZWRKb2JMb2NrLmphdmE=) | `63.33% <0%> (-1.12%)` | `15% <0%> (-1%)` | |
| [.../apache/gobblin/runtime/api/JobExecutionState.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL0pvYkV4ZWN1dGlvblN0YXRlLmphdmE=) | `79.43% <0%> (-0.94%)` | `24% <0%> (ø)` | |
| [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `64.22% <0%> (ø)` | `27% <0%> (-1%)` | :arrow_down: |
| ... and [6 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=footer). Last update [66e201c...1181c37](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] htran1 commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r367159350
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceRecordIterator.java
##########
@@ -99,7 +103,40 @@ private void fulfillCurrentRecord() {
}
}
- private void resetCurrentRecordStatus() {
+ private InputStreamCSVReader reopenStreamWithRetry(ResultStruct resultStruct, int seekLineNumber, int retryNumber, int count) {
Review comment:
Can you refactor the logic in `SalesforceExtractor.reinitializeBufferedReader` and use it here? That has an additional validation to make sure that after skipping we arrived at the expected location based on the last record read before the failure.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r369857200
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
##########
@@ -0,0 +1,115 @@
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import com.sforce.async.BulkConnection;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+
+@Slf4j
+public class BulkResultIterator implements Iterator<JsonElement> {
+ private FileIdVO fileIdVO;
+ private int retryLimit;
+ private BulkConnection conn;
+ private InputStreamCSVReader csvReader;
+ private List<String> header;
+ private int columnSize;
+ private int lineCount = 0; // this is different than currentFileRowCount. cvs file has header
+ private List<String> preLoadedLine = null;
+
+ public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int retryLimit) {
+ log.info("create BulkResultIterator: " + fileIdVO);
+ this.conn = conn;
+ this.fileIdVO = fileIdVO;
+ this.retryLimit = retryLimit;
+ }
+
+ /**
+ * read first data record from cvsReader and initiate header
+ * not supposed to do it in constructor function, for delay creating file stream
+ */
+ private void initHeader() {
+ this.header = this.nextLine(); // first line is header
+ this.columnSize = this.header.size();
+ this.preLoadedLine = this.nextLine(); // initialize: buffer one record data
+ }
+
+ private List<String> nextLine() {
+ Exception exception = null;
+ for (int i = 0; i < retryLimit; i++) {
+ try {
+ if (this.csvReader == null) {
+ this.csvReader = openAndSeekCsvReader();
+ }
+ List<String> line = this.csvReader.nextRecord();
+ this.lineCount++;
+ return line;
+ } catch (InputStreamCSVReader.CSVParseException e) {
+ throw new RuntimeException(e); // don't retry if it is parse error
+ } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
+ exception = e;
+ log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
+ this.csvReader = openAndSeekCsvReader();
+ }
+ }
+ throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", exception);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (this.header == null) {
+ initHeader();
+ }
+ return this.preLoadedLine != null;
+ }
+
+ @Override
+ public JsonElement next() {
+ if (this.header == null) {
+ initHeader();
+ }
+ JsonElement jsonObject = Utils.csvToJsonObject(this.header, this.preLoadedLine, this.columnSize);
+ this.preLoadedLine = this.nextLine();
+ if (this.preLoadedLine == null) {
+ log.info("----Record count: [{}] for {}", lineCount - 1, fileIdVO);
+ }
+ return jsonObject;
+ }
+
+ private InputStreamCSVReader openAndSeekCsvReader() {
+ String jobId = fileIdVO.getJobId();
+ String batchId = fileIdVO.getBatchId();
+ String resultId = fileIdVO.getResultId();
+ log.info("Fetching [jobId={}, batchId={}, resultId={}]", jobId, batchId, resultId);
+ closeCsvReader();
+ try {
+ InputStream is = conn.getQueryResultStream(jobId, batchId, resultId);
+ BufferedReader br = new BufferedReader(new InputStreamReader(is, ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+ csvReader = new InputStreamCSVReader(br);
+ for (int j = 0; j < lineCount; j++) {
+ csvReader.nextRecord(); // skip these record
+ }
+ // failed to skip seekLineNumber lines, try to open the file again. same time count+1
+ return csvReader;
Review comment:
Added it back. test - https://ltx1-holdemaz05.grid.linkedin.com:8443/executor?execid=22307199&job=salesforce_task_full&attempt=0
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] codecov-io edited a comment on issue #2868:
GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#issuecomment-574354876
# [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=h1) Report
> Merging [#2868](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/66e201ceefad1b97fcad83b50f2954e48ef2d0f4?src=pr&el=desc) will **decrease** coverage by `41.69%`.
> The diff coverage is `0%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2868 +/- ##
===========================================
- Coverage 45.79% 4.09% -41.7%
+ Complexity 9109 748 -8361
===========================================
Files 1915 1918 +3
Lines 72293 72270 -23
Branches 7974 7959 -15
===========================================
- Hits 33106 2962 -30144
- Misses 36161 68990 +32829
+ Partials 3026 318 -2708
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `0% <0%> (-19.82%)` | `0 <0> (-12)` | |
| [...apache/gobblin/salesforce/QueryResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUXVlcnlSZXN1bHRJdGVyYXRvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [.../apache/gobblin/salesforce/BulkResultIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvQnVsa1Jlc3VsdEl0ZXJhdG9yLmphdmE=) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [...che/gobblin/salesforce/ResultChainingIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUmVzdWx0Q2hhaW5pbmdJdGVyYXRvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
| [...n/source/extractor/utils/InputStreamCSVReader.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvdXRpbHMvSW5wdXRTdHJlYW1DU1ZSZWFkZXIuamF2YQ==) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
| [...n/java/org/apache/gobblin/salesforce/FileIdVO.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvRmlsZUlkVk8uamF2YQ==) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [...n/converter/AvroStringFieldDecryptorConverter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY3J5cHRvLXByb3ZpZGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbnZlcnRlci9BdnJvU3RyaW5nRmllbGREZWNyeXB0b3JDb252ZXJ0ZXIuamF2YQ==) | `0% <0%> (-100%)` | `0% <0%> (-2%)` | |
| [...he/gobblin/cluster/TaskRunnerSuiteThreadModel.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvVGFza1J1bm5lclN1aXRlVGhyZWFkTW9kZWwuamF2YQ==) | `0% <0%> (-100%)` | `0% <0%> (-5%)` | |
| [...n/mapreduce/avro/AvroKeyCompactorOutputFormat.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL2F2cm8vQXZyb0tleUNvbXBhY3Rvck91dHB1dEZvcm1hdC5qYXZh) | `0% <0%> (-100%)` | `0% <0%> (-3%)` | |
| ... and [1119 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=footer). Last update [66e201c...863c6d4](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] codecov-io commented on issue #2868:
GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#issuecomment-574354876
# [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=h1) Report
> Merging [#2868](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/357d1db84c4601b3e50f7d26a05d8ce282c5159d?src=pr&el=desc) will **decrease** coverage by `41.65%`.
> The diff coverage is `0%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2868 +/- ##
============================================
- Coverage 45.75% 4.1% -41.66%
+ Complexity 9103 747 -8356
============================================
Files 1917 1917
Lines 72131 72158 +27
Branches 7956 7958 +2
============================================
- Hits 33003 2961 -30042
- Misses 36102 68878 +32776
+ Partials 3026 319 -2707
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...apache/gobblin/salesforce/SalesforceExtractor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZUV4dHJhY3Rvci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
| [...rg/apache/gobblin/salesforce/SalesforceSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVNvdXJjZS5qYXZh) | `0% <0%> (-19.82%)` | `0 <0> (-12)` | |
| [...e/gobblin/salesforce/SalesforceRecordIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2FsZXNmb3JjZVJlY29yZEl0ZXJhdG9yLmphdmE=) | `0% <0%> (ø)` | `0 <0> (?)` | |
| [...n/converter/AvroStringFieldDecryptorConverter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY3J5cHRvLXByb3ZpZGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbnZlcnRlci9BdnJvU3RyaW5nRmllbGREZWNyeXB0b3JDb252ZXJ0ZXIuamF2YQ==) | `0% <0%> (-100%)` | `0% <0%> (-2%)` | |
| [...he/gobblin/cluster/TaskRunnerSuiteThreadModel.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvVGFza1J1bm5lclN1aXRlVGhyZWFkTW9kZWwuamF2YQ==) | `0% <0%> (-100%)` | `0% <0%> (-5%)` | |
| [...n/mapreduce/avro/AvroKeyCompactorOutputFormat.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL2F2cm8vQXZyb0tleUNvbXBhY3Rvck91dHB1dEZvcm1hdC5qYXZh) | `0% <0%> (-100%)` | `0% <0%> (-3%)` | |
| [...apache/gobblin/fork/CopyNotSupportedException.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZm9yay9Db3B5Tm90U3VwcG9ydGVkRXhjZXB0aW9uLmphdmE=) | `0% <0%> (-100%)` | `0% <0%> (-1%)` | |
| [.../gobblin/kafka/writer/KafkaWriterCommonConfig.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2thZmthL3dyaXRlci9LYWZrYVdyaXRlckNvbW1vbkNvbmZpZy5qYXZh) | `0% <0%> (-100%)` | `0% <0%> (-7%)` | |
| [...ker/task/TaskLevelPolicyCheckerBuilderFactory.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3F1YWxpdHljaGVja2VyL3Rhc2svVGFza0xldmVsUG9saWN5Q2hlY2tlckJ1aWxkZXJGYWN0b3J5LmphdmE=) | `0% <0%> (-100%)` | `0% <0%> (-2%)` | |
| [...bblin/data/management/copy/AllEqualComparator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvQWxsRXF1YWxDb21wYXJhdG9yLmphdmE=) | `0% <0%> (-100%)` | `0% <0%> (-2%)` | |
| ... and [1113 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2868/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=footer). Last update [357d1db...501d178](https://codecov.io/gh/apache/incubator-gobblin/pull/2868?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-gobblin] htran1 commented on a change in pull request
#2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
Posted by GitBox <gi...@apache.org>.
htran1 commented on a change in pull request #2868: GOBBLIN-1025: Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r367155043
##########
File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
##########
@@ -26,12 +26,12 @@ private SalesforceConfigurationKeys() {
public static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll";
// pk-chunking
- public static final String PK_CHUNKING_TEST_BATCH_ID_LIST = "salesforce.pkChunking.testBatchIdList";
- public static final String PK_CHUNKING_TEST_JOB_ID = "salesforce.pkChunking.testJobId";
+ public static final String BULK_TEST_JOB_ID = "salesforce.bulk.test.job.id";
Review comment:
Please follow the config naming convention of using '.' as namespace separators, so this should be "salesforce.bulk.testJobId". Change the other new keys to conform.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services