You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2020/01/30 01:30:23 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1025] Add retry
for PK-Chuking iterator[]
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 29cc4bc [GOBBLIN-1025] Add retry for PK-Chuking iterator[]
29cc4bc is described below
commit 29cc4bc24c45e148bcab36d3e3bd6a71c69d1d21
Author: Alex Li <al...@linkedin.com>
AuthorDate: Wed Jan 29 17:30:16 2020 -0800
[GOBBLIN-1025] Add retry for PK-Chuking iterator[]
Closes #2868 from arekusuri/master
---
.gitignore | 4 +
.../extractor/utils/InputStreamCSVReader.java | 10 +
.../gobblin/salesforce/BulkResultIterator.java | 146 ++++++++
.../org/apache/gobblin/salesforce/FileIdVO.java | 34 ++
.../gobblin/salesforce/QueryResultIterator.java | 95 ++++++
.../gobblin/salesforce/ResultChainingIterator.java | 73 ++++
.../apache/gobblin/salesforce/ResultIterator.java | 166 ----------
.../salesforce/SalesforceConfigurationKeys.java | 13 +-
.../gobblin/salesforce/SalesforceExtractor.java | 366 +++++----------------
.../gobblin/salesforce/SalesforceSource.java | 23 +-
10 files changed, 467 insertions(+), 463 deletions(-)
diff --git a/.gitignore b/.gitignore
index 5195ebb..ca034f5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -73,3 +73,7 @@ gobblin-modules/gobblin-elasticsearch/test-elasticsearch/
temp/
ligradle/*
+FsDatasetStateStoreTest/
+GobblinHelixTaskTest/
+commit-sequence-store-test/
+gobblin-test-harness/src/test/resources/runtime_test/state_store/
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/utils/InputStreamCSVReader.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/utils/InputStreamCSVReader.java
index 1aa0ed5..beea504 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/utils/InputStreamCSVReader.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/utils/InputStreamCSVReader.java
@@ -42,6 +42,8 @@ public class InputStreamCSVReader {
private int maxFieldCount;
private boolean atEOF;
+ private BufferedReader bufferedReader;
+
public InputStreamCSVReader(Reader input) {
this(new BufferedReader(input));
}
@@ -90,6 +92,7 @@ public class InputStreamCSVReader {
}
public InputStreamCSVReader(BufferedReader input, char separator, char enclosedChar) {
+ this.bufferedReader = input;
this.separator = separator;
// parser settings for the separator and escape chars
this.parser = new StreamTokenizer(input);
@@ -259,4 +262,11 @@ public class InputStreamCSVReader {
return this.recordNumber;
}
}
+
+ /**
+ * close the bufferedReader
+ */
+ public void close() throws IOException {
+ this.bufferedReader.close();
+ }
}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
new file mode 100644
index 0000000..4178193
--- /dev/null
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import com.sforce.async.BulkConnection;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+/**
+ * Iterator for fetching result file of Bulk API.
+ */
+@Slf4j
+public class BulkResultIterator implements Iterator<JsonElement> {
+ private FileIdVO fileIdVO;
+ private int retryLimit;
+ private BulkConnection conn;
+ private InputStreamCSVReader csvReader;
+ private List<String> header;
+ private int columnSize;
+ private int lineCount = 0; // this is different than currentFileRowCount. cvs file has header
+ private List<String> preLoadedLine = null;
+
+ public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int retryLimit) {
+ log.info("create BulkResultIterator: " + fileIdVO);
+ this.conn = conn;
+ this.fileIdVO = fileIdVO;
+ this.retryLimit = retryLimit;
+ }
+
+ /**
+ * read first data record from cvsReader and initiate header
+ * not supposed to do it in constructor function, for delay creating file stream
+ */
+ private void initHeader() {
+ this.header = this.nextLineWithRetry(); // first line is header
+ this.columnSize = this.header.size();
+ this.preLoadedLine = this.nextLineWithRetry(); // initialize: buffer one record data
+ }
+
+ private List<String> nextLineWithRetry() {
+ Exception exception = null;
+ for (int i = 0; i < retryLimit; i++) {
+ try {
+ if (this.csvReader == null) {
+ this.csvReader = openAndSeekCsvReader(null);
+ }
+ List<String> line = this.csvReader.nextRecord();
+ this.lineCount++;
+ return line;
+ } catch (InputStreamCSVReader.CSVParseException e) {
+ throw new RuntimeException(e); // don't retry if it is parse error
+ } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
+ exception = e;
+ log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
+ this.csvReader = openAndSeekCsvReader(e);
+ }
+ }
+ throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", exception);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (this.header == null) {
+ initHeader();
+ }
+ return this.preLoadedLine != null;
+ }
+
+ @Override
+ public JsonElement next() {
+ if (this.header == null) {
+ initHeader();
+ }
+ JsonElement jsonObject = Utils.csvToJsonObject(this.header, this.preLoadedLine, this.columnSize);
+ this.preLoadedLine = this.nextLineWithRetry();
+ if (this.preLoadedLine == null) {
+ log.info("----Record count: [{}] for {}", getRowCount(), fileIdVO);
+ }
+ return jsonObject;
+ }
+
+ private InputStreamCSVReader openAndSeekCsvReader(Exception exceptionRetryFor) {
+ String jobId = fileIdVO.getJobId();
+ String batchId = fileIdVO.getBatchId();
+ String resultId = fileIdVO.getResultId();
+ log.info("Fetching [jobId={}, batchId={}, resultId={}]", jobId, batchId, resultId);
+ closeCsvReader();
+ try {
+ InputStream is = conn.getQueryResultStream(jobId, batchId, resultId);
+ BufferedReader br = new BufferedReader(new InputStreamReader(is, ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+ csvReader = new InputStreamCSVReader(br);
+ List<String> lastSkippedLine = null;
+ for (int j = 0; j < lineCount; j++) {
+ lastSkippedLine = csvReader.nextRecord(); // skip these records
+ }
+ if ((lastSkippedLine == null && preLoadedLine != null) || (lastSkippedLine != null && !lastSkippedLine.equals(preLoadedLine))) {
+ // check if last skipped line is same as the line before error
+ throw new RuntimeException("Failed to verify last skipped line - retrying for =>", exceptionRetryFor);
+ }
+ return csvReader;
+ } catch (Exception e) { // failed to open reader and skip lineCount lines
+ exceptionRetryFor = exceptionRetryFor != null? exceptionRetryFor : e;
+ throw new RuntimeException("Failed to [" + e.getMessage() + "] - retrying for => " , exceptionRetryFor);
+ }
+ }
+
+ private int getRowCount() {
+ // first line is header, last line is `null`,
+ // because cvsReader doesn't have hasNext to check end of the stream, we will get null as last line
+ return lineCount - 2;
+ }
+
+ private void closeCsvReader() {
+ if (this.csvReader != null) {
+ try {
+ this.csvReader.close();
+ } catch (IOException e) {
+ // ignore the exception
+ }
+ }
+ }
+}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/FileIdVO.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/FileIdVO.java
new file mode 100644
index 0000000..0eb2fd3
--- /dev/null
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/FileIdVO.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import lombok.Data;
+
+
+/**
+ * FileIdVO class
+ */
+@Data
+public class FileIdVO {
+ private final String jobId;
+ private final String batchId;
+ private final String resultId;
+ public String toString() {
+ return String.format("[jobId=%s, batchId=%s, resultId=%s]", jobId, batchId, resultId);
+ }
+}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
new file mode 100644
index 0000000..56c76ea
--- /dev/null
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ * It is a wrapper of
+ * RestApiExtractor.getRecordSet(schema, entity, workUnit, predicateList)
+ * the reason why we want to a wrapper for the function is -
+ * We want to delay the execution of the function. Only when the next get called, we fetch the data.
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
+
+ private int recordCount = 0;
+ private SalesforceExtractor extractor;
+ private String schema;
+ private String entity;
+ private WorkUnit workUnit;
+ private List<Predicate> predicateList;
+
+ private Iterator<JsonElement> queryResultIter;
+
+ public QueryResultIterator(
+ SalesforceExtractor extractor,
+ String schema,
+ String entity,
+ WorkUnit workUnit,
+ List<Predicate> predicateList
+ ) {
+ log.info("create query result iterator.");
+ this.extractor = extractor;
+ this.schema = schema;
+ this.entity = entity;
+ this.workUnit = workUnit;
+ this.predicateList = predicateList;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (queryResultIter == null) {
+ initQueryResultIter();
+ }
+ if (!queryResultIter.hasNext()) {
+ // no more data, print out total
+ log.info("Rest API query records total:{}", recordCount);
+ }
+ return queryResultIter.hasNext();
+ }
+
+ private void initQueryResultIter() {
+ try {
+ queryResultIter = extractor.getRecordSet(schema, entity, workUnit, predicateList);
+ } catch (DataRecordException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public JsonElement next() {
+ if (queryResultIter == null) {
+ initQueryResultIter();
+ }
+ recordCount ++;
+ if (!queryResultIter.hasNext()) {
+ // no more data, print out total
+ log.info("Rest API query records total:{}", recordCount);
+ }
+ return queryResultIter.next();
+ }
+}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java
new file mode 100644
index 0000000..0ab3629
--- /dev/null
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.common.collect.Iterators;
+import com.google.gson.JsonElement;
+import com.sforce.async.BulkConnection;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * The Iterator to chain all result iterators together.
+ * It is to create only one iterator for a list of result files of BulkAPI.
+ * Same time it can also be able to add other iterator with function `add` to combine to 1 iterator
+ */
+@Slf4j
+public class ResultChainingIterator implements Iterator<JsonElement> {
+ private Iterator<JsonElement> iter;
+ private int recordCount = 0;
+
+ public ResultChainingIterator(BulkConnection conn, List<FileIdVO> fileIdList, int retryLimit) {
+ Iterator<BulkResultIterator> iterOfFiles = fileIdList.stream().map(x -> new BulkResultIterator(conn, x, retryLimit)).iterator();
+ iter = Iterators.<JsonElement>concat(iterOfFiles);
+ }
+
+ public Iterator<JsonElement> get() {
+ return iter;
+ }
+
+ public void add(Iterator<JsonElement> iter) {
+ this.iter = Iterators.concat(this.iter, iter);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!iter.hasNext()) {
+ // hasNext is false, means all data in this iterator was fetched
+ // we can print out record total.
+ log.info("====Total records: [{}] ====", recordCount);
+ }
+ return iter.hasNext();
+ }
+
+ @Override
+ public JsonElement next() {
+ JsonElement jsonElement = iter.next();
+ recordCount ++;
+ if (!iter.hasNext()) {
+ // see hasNext.
+ //In case caller may not check hasNext and use next() == null as end of the interator
+ // we can print out total here.
+ log.info("====Total records: [{}] ====", recordCount);
+ }
+ return jsonElement;
+ }
+}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java
deleted file mode 100644
index d6b5566..0000000
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.salesforce;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.sforce.async.BulkConnection;
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
-import org.apache.gobblin.source.extractor.utils.Utils;
-
-
-/**
- * Result Iterator.
- * Take jobId and 'batchId:resultId,batchId2:resultId2' as input build a result record iterator.
- */
-@Slf4j
-public class ResultIterator implements Iterator {
- private Iterator<ResultStruct> batchIdResultIdIterator;
- private BulkConnection bulkConnection;
- private InputStreamCSVReader csvReader;
- private List<String> csvHeader;
- private int columnSize;
- private int urlLoadRetryLimit;
- private ResultStruct resultStruct;
- private List<String> currentRecord = null;
- private Boolean isLoadedCurrentRecord = false;
- private int currentFileRowCount = 0;
- private int totalRowCount = 0;
-
- /**
- * constructor
- * need to initiate the reader and currentRecord
- */
- public ResultIterator(BulkConnection bulkConnection, String jobId, String batchIdResultIdString, int urlLoadRetryLimit) {
- this.urlLoadRetryLimit = urlLoadRetryLimit;
- this.bulkConnection = bulkConnection;
- this.batchIdResultIdIterator = this.parsebatchIdResultIdString(jobId, batchIdResultIdString);
- if (this.batchIdResultIdIterator.hasNext()) {
- this.resultStruct = this.batchIdResultIdIterator.next();
- this.csvReader = this.fetchResultsetAsCsvReader(this.resultStruct); // first file reader
- } else {
- throw new RuntimeException("No batch-result id found.");
- }
- this.fulfillCurrentRecord();
- this.csvHeader = this.currentRecord;
- this.columnSize = this.csvHeader.size();
- // after fetching cvs header, clean up status
- this.resetCurrentRecordStatus();
- }
-
- /**
- * call reader.next and set up currentRecord
- */
- private void fulfillCurrentRecord() {
- if (this.isLoadedCurrentRecord) {
- return; // skip, since CurrentRecord was loaded.
- }
- try {
- this.currentRecord = this.csvReader.nextRecord();
- if (this.currentRecord == null) { // according InputStreamCSVReader, it returns null at the end of the reader.
- log.info("Fetched {} - rows: {}", this.resultStruct, this.currentFileRowCount); // print out log before switch result file
- this.currentFileRowCount = 0; // clean up
- if (this.batchIdResultIdIterator.hasNext()) { // if there is next file, load next file.
- this.resultStruct = this.batchIdResultIdIterator.next();
- this.csvReader = this.fetchResultsetAsCsvReader(resultStruct);
- this.csvReader.nextRecord(); // read and ignore the csv header.
- this.currentRecord = this.csvReader.nextRecord();
- } else {
- log.info("---- Fetched {} rows -----", this.totalRowCount); // print out log when all records were fetched.
- }
- }
- this.isLoadedCurrentRecord = true;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private void resetCurrentRecordStatus() {
- this.currentRecord = null;
- this.isLoadedCurrentRecord = false;
- }
-
- @Override
- public boolean hasNext() {
- this.fulfillCurrentRecord();
- return this.currentRecord != null;
- }
-
- @Override
- public JsonElement next() {
- this.fulfillCurrentRecord();
- List<String> csvRecord = this.currentRecord;
- this.resetCurrentRecordStatus();
- if (csvRecord == null) {
- throw new NoSuchElementException();
- }
- this.currentFileRowCount++;
- this.totalRowCount++;
- JsonObject jsonObject = Utils.csvToJsonObject(this.csvHeader, csvRecord, this.columnSize);
- return jsonObject;
- }
-
- /**
- * resultStruct has all data which identify a result set file
- * fetch it and convert to a csvReader
- */
- private InputStreamCSVReader fetchResultsetAsCsvReader(ResultStruct resultStruct) {
- String jobId = resultStruct.jobId;
- String batchId = resultStruct.batchId;
- String resultId = resultStruct.resultId;
- log.info("PK-Chunking workunit: fetching [jobId={}, batchId={}, resultId={}]", jobId, batchId, resultId);
- for (int i = 0; i < this.urlLoadRetryLimit; i++) { // retries
- try {
- InputStream is = this.bulkConnection.getQueryResultStream(jobId, batchId, resultId);
- BufferedReader br = new BufferedReader(new InputStreamReader(is, ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
- return new InputStreamCSVReader(br);
- } catch (Exception e) { // skip, for retry
- }
- }
- // tried fetchRetryLimit times, always getting exception
- throw new RuntimeException("Tried " + this.urlLoadRetryLimit + " times, but couldn't fetch data.");
- }
-
- /**
- * input string format is "batchId:resultId,batchId2:resultId2"
- * parse it to iterator
- */
- private Iterator<ResultStruct> parsebatchIdResultIdString(String jobId, String batchIdResultIdString) {
- return Arrays.stream(batchIdResultIdString.split(",")).map(x -> x.split(":")).map(x -> new ResultStruct(jobId, x[0], x[1])).iterator();
- }
-
- @Data
- static class ResultStruct {
- private final String jobId;
- private final String batchId;
- private final String resultId;
- public String toString() {
- return String.format("[jobId=%s, batchId=%s, resultId=%s]", jobId, batchId, resultId);
- }
- }
-
-}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
index d5fe3f7..6d40421 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
@@ -17,6 +17,9 @@
package org.apache.gobblin.salesforce;
+/**
+ * SalesforceConfigurationKeys
+ */
public final class SalesforceConfigurationKeys {
private SalesforceConfigurationKeys() {
}
@@ -26,15 +29,13 @@ public final class SalesforceConfigurationKeys {
public static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll";
// pk-chunking
- public static final String PK_CHUNKING_TEST_BATCH_ID_LIST = "salesforce.pkChunking.testBatchIdList";
- public static final String PK_CHUNKING_TEST_JOB_ID = "salesforce.pkChunking.testJobId";
+ public static final String BULK_TEST_JOB_ID = "salesforce.bulk.testJobId";
+ public static final String BULK_TEST_BATCH_ID_LIST = "salesforce.bulk.testBatchIds";
public static final String SALESFORCE_PARTITION_TYPE = "salesforce.partitionType";
public static final String PARTITION_PK_CHUNKING_SIZE = "salesforce.partition.pkChunkingSize";
- public static final String PK_CHUNKING_JOB_ID = "_salesforce.job.id";
- public static final String PK_CHUNKING_BATCH_RESULT_IDS = "_salesforce.result.ids";
+ public static final String PK_CHUNKING_JOB_ID = "__salesforce.job.id"; // don't use in ini config
+ public static final String PK_CHUNKING_BATCH_RESULT_ID_PAIRS = "__salesforce.batch.result.id.pairs"; // don't use in ini config
public static final int MAX_PK_CHUNKING_SIZE = 250_000; // this number is from SFDC's doc - https://tinyurl.com/ycjvgwv2
public static final int MIN_PK_CHUNKING_SIZE = 20_000;
public static final int DEFAULT_PK_CHUNKING_SIZE = 250_000; // default to max for saving request quota
}
-
-
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 36186e5..4582b0b 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -18,11 +18,8 @@
package org.apache.gobblin.salesforce;
import com.google.common.collect.Iterators;
-import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.io.FileNotFoundException;
-import java.io.InputStreamReader;
import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -34,7 +31,9 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.methods.HttpGet;
@@ -77,10 +76,7 @@ import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand.RestApiCommandType;
import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector;
import org.apache.gobblin.source.extractor.extract.restapi.RestApiExtractor;
-import org.apache.gobblin.source.extractor.resultset.RecordSet;
-import org.apache.gobblin.source.extractor.resultset.RecordSetList;
import org.apache.gobblin.source.extractor.schema.Schema;
-import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
import org.apache.gobblin.source.extractor.utils.Utils;
import org.apache.gobblin.source.extractor.watermark.Predicate;
import org.apache.gobblin.source.extractor.watermark.WatermarkType;
@@ -91,7 +87,6 @@ import lombok.extern.slf4j.Slf4j;
import static org.apache.gobblin.salesforce.SalesforceConfigurationKeys.*;
-
/**
* An implementation of salesforce extractor for extracting data from SFDC
*/
@@ -108,28 +103,18 @@ public class SalesforceExtractor extends RestApiExtractor {
private static final String FETCH_RETRY_LIMIT_KEY = "salesforce.fetchRetryLimit";
private static final boolean DEFAULT_BULK_API_USE_QUERY_ALL = false;
-
private boolean pullStatus = true;
private String nextUrl;
private BulkConnection bulkConnection = null;
- private boolean bulkApiInitialRun = true;
private JobInfo bulkJob = new JobInfo();
- private BufferedReader bulkBufferedReader = null;
- private List<BatchIdAndResultId> bulkResultIdList = Lists.newArrayList();
- private int bulkResultIdCount = 0;
+ private List<BatchIdAndResultId> bulkResultIdList;
private boolean bulkJobFinished = true;
- private List<String> bulkRecordHeader;
- private int bulkResultColumCount;
private boolean newBulkResultSet = true;
- private int bulkRecordCount = 0;
- private int prevBulkRecordCount = 0;
- private List<String> csvRecord;
private final int pkChunkingSize;
private final SalesforceConnector sfConnector;
- private final int fetchRetryLimit;
- private final int batchSize;
+ private final int retryLimit;
private final boolean bulkApiUseQueryAll;
@@ -142,13 +127,7 @@ public class SalesforceExtractor extends RestApiExtractor {
Math.min(MAX_PK_CHUNKING_SIZE, workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE)));
this.bulkApiUseQueryAll = workUnitState.getPropAsBoolean(BULK_API_USE_QUERY_ALL, DEFAULT_BULK_API_USE_QUERY_ALL);
-
- // Get batch size from .pull file
- int tmpBatchSize = workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE,
- ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE);
-
- this.batchSize = tmpBatchSize == 0 ? ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE : tmpBatchSize;
- this.fetchRetryLimit = workUnitState.getPropAsInt(FETCH_RETRY_LIMIT_KEY, DEFAULT_FETCH_RETRY_LIMIT);
+ this.retryLimit = workUnitState.getPropAsInt(FETCH_RETRY_LIMIT_KEY, DEFAULT_FETCH_RETRY_LIMIT);
}
@Override
@@ -563,75 +542,63 @@ public class SalesforceExtractor extends RestApiExtractor {
return dataTypeMap;
}
-
private Boolean isPkChunkingFetchDone = false;
- private Iterator<JsonElement> getRecordSetPkChunking(WorkUnit workUnit) throws RuntimeException {
+ private Iterator<JsonElement> fetchRecordSetPkChunking(WorkUnit workUnit) {
if (isPkChunkingFetchDone) {
return null; // must return null to represent no more data.
}
+ log.info("----Get records for pk-chunking----" + workUnit.getProp(PK_CHUNKING_JOB_ID));
isPkChunkingFetchDone = true; // set to true, never come here twice.
+ bulkApiLogin();
+ String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
+ String batchIdResultIdPairString = workUnit.getProp(PK_CHUNKING_BATCH_RESULT_ID_PAIRS);
+ List<FileIdVO> fileIdList = this.parseBatchIdResultIdString(jobId, batchIdResultIdPairString);
+ return new ResultChainingIterator(bulkConnection, fileIdList, retryLimit);
+ }
+ private List<FileIdVO> parseBatchIdResultIdString(String jobId, String batchIdResultIdString) {
+ return Arrays.stream(batchIdResultIdString.split(","))
+ .map( x -> x.split(":")).map(x -> new FileIdVO(jobId, x[0], x[1]))
+ .collect(Collectors.toList());
+ }
+
+ private Boolean isBulkFetchDone = false;
+
+ private Iterator<JsonElement> fetchRecordSet(
+ String schema,
+ String entity,
+ WorkUnit workUnit,
+ List<Predicate> predicateList
+) {
+ if (isBulkFetchDone) {
+ return null; // need to return null to indicate no more data.
+ }
+ isBulkFetchDone = true;
+ log.info("----Get records for bulk batch job----");
try {
- if (!bulkApiLogin()) {
- throw new IllegalArgumentException("Invalid Login");
- }
+ // set finish status to false before starting the bulk job
+ this.setBulkJobFinished(false);
+ this.bulkResultIdList = getQueryResultIds(entity, predicateList);
+ log.info("Number of bulk api resultSet Ids:" + this.bulkResultIdList.size());
+ List<FileIdVO> fileIdVoList = this.bulkResultIdList.stream()
+ .map(x -> new FileIdVO(this.bulkJob.getId(), x.batchId, x.resultId))
+ .collect(Collectors.toList());
+ ResultChainingIterator chainingIter = new ResultChainingIterator(this.bulkConnection, fileIdVoList, this.retryLimit);
+ chainingIter.add(getSoftDeletedRecords(schema, entity, workUnit, predicateList));
+ return chainingIter;
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RuntimeException("Failed to get records using bulk api; error - " + e.getMessage(), e);
}
-
- String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
- String batchIdResultIdString = workUnit.getProp(PK_CHUNKING_BATCH_RESULT_IDS);
- return new ResultIterator(bulkConnection, jobId, batchIdResultIdString, fetchRetryLimit);
}
@Override
- public Iterator<JsonElement> getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit,
- List<Predicate> predicateList) throws IOException {
+ public Iterator<JsonElement> getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) {
log.debug("Getting salesforce data using bulk api");
-
- // new version of extractor: bulk api with pk-chunking in pre-partitioning of SalesforceSource
if (workUnit.contains(PK_CHUNKING_JOB_ID)) {
- log.info("----pk-chunking get record set----" + workUnit.getProp(PK_CHUNKING_JOB_ID));
- return getRecordSetPkChunking(workUnit);
- }
- log.info("----bulk get record set----");
- try {
- //Get query result ids in the first run
- //result id is used to construct url while fetching data
- if (this.bulkApiInitialRun) {
- // set finish status to false before starting the bulk job
- this.setBulkJobFinished(false);
- this.bulkResultIdList = getQueryResultIds(entity, predicateList);
- log.info("Number of bulk api resultSet Ids:" + this.bulkResultIdList.size());
- }
-
- // Get data from input stream
- // If bulk load is not finished, get data from the stream
- // Skip empty result sets since they will cause the extractor to terminate early
- RecordSet<JsonElement> rs = null;
- while (!this.isBulkJobFinished() && (rs == null || rs.isEmpty())) {
- rs = getBulkData();
- }
-
- // Set bulkApiInitialRun to false after the completion of first run
- this.bulkApiInitialRun = false;
-
- // If bulk job is finished, get soft deleted records using Rest API
- boolean disableSoftDeletePull = this.workUnit.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED);
- if (rs == null || rs.isEmpty()) { // TODO: Only when the rs is empty, we need to fetch soft deleted records. WHY?
- // Get soft delete records only if IsDeleted column exists and soft deletes pull is not disabled
- if (this.columnList.contains("IsDeleted") && !disableSoftDeletePull) {
- log.info("Pull soft delete records");
- return this.getSoftDeletedRecords(schema, entity, workUnit, predicateList);
- }
- log.info("Ignoring soft delete records");
- }
-
- return rs.iterator();
-
- } catch (Exception e) {
- throw new IOException("Failed to get records using bulk api; error - " + e.getMessage(), e);
+ return fetchRecordSetPkChunking(workUnit);
+ } else {
+ return fetchRecordSet(schema, entity, workUnit, predicateList);
}
}
@@ -639,16 +606,31 @@ public class SalesforceExtractor extends RestApiExtractor {
* Get soft deleted records using Rest Api
* @return iterator with deleted records
*/
- private Iterator<JsonElement> getSoftDeletedRecords(String schema, String entity, WorkUnit workUnit,
- List<Predicate> predicateList) throws DataRecordException {
- return this.getRecordSet(schema, entity, workUnit, predicateList);
+ private Iterator<JsonElement> getSoftDeletedRecords(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList)
+ throws DataRecordException {
+ boolean disableSoftDeletePull = this.workUnit.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED);
+ if (this.columnList.contains("IsDeleted") && !disableSoftDeletePull) {
+ return new QueryResultIterator(this, schema, entity, workUnit, predicateList);
+ } else {
+ log.info("Ignoring soft delete records");
+ return null;
+ }
}
+ private void bulkApiLogin() {
+ try {
+ if (!doBulkApiLogin()) {
+ throw new RuntimeException("invalid login");
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
/**
* Login to salesforce
* @return login status
*/
- private boolean bulkApiLogin() throws Exception {
+ private boolean doBulkApiLogin() throws Exception {
log.info("Authenticating salesforce bulk api");
boolean success = false;
String hostName = this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
@@ -709,20 +691,23 @@ public class SalesforceExtractor extends RestApiExtractor {
/**
* same as getQueryResultIdsPkChunking but the arguments are different.
- * this function can take existing batch ids to return JobIdAndBatchIdResultIdList
+ * this function can take existing batch ids to return ResultFileIdsStruct
* It is for test/debug. developers may want to skip execute query on SFDC, use a list of existing batch ids
*/
- public JobIdAndBatchIdResultIdList getQueryResultIdsPkChunkingFetchOnly(String jobId, String batchIdListStr) {
+ public ResultFileIdsStruct getQueryResultIdsPkChunkingFetchOnly(String jobId, String batchIdListStr) {
+ bulkApiLogin();
try {
- if (!bulkApiLogin()) {
- throw new IllegalArgumentException("Invalid Login");
- }
int retryInterval = Math.min(MAX_RETRY_INTERVAL_SECS * 1000, 30 + this.pkChunkingSize * 2);
- if (batchIdListStr != null) {
- log.info("The batchId was specified.");
+ if (StringUtils.isNotEmpty(batchIdListStr)) {
+ log.info("The batchId is specified.");
return retrievePkChunkingResultIdsByBatchId(this.bulkConnection, jobId, batchIdListStr);
} else {
- return retrievePkChunkingResultIds(this.bulkConnection, jobId, retryInterval);
+ ResultFileIdsStruct resultStruct = retrievePkChunkingResultIds(this.bulkConnection, jobId, retryInterval);
+ if (resultStruct.getBatchIdAndResultIdList().isEmpty()) {
+ String msg = String.format("There are no result for the [jobId: %s, batchIds: %s]", jobId, batchIdListStr);
+ throw new RuntimeException(msg);
+ }
+ return resultStruct;
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -741,11 +726,9 @@ public class SalesforceExtractor extends RestApiExtractor {
* TODO: abstract this function to a common function: arguments need to add connetion, header, output-format
* TODO: make it and its related functions pure function (no side effect). Currently still unnecesarily changing this.bulkJob)
*/
- public JobIdAndBatchIdResultIdList getQueryResultIdsPkChunking(String entity, List<Predicate> predicateList) {
+ public ResultFileIdsStruct getQueryResultIdsPkChunking(String entity, List<Predicate> predicateList) {
+ bulkApiLogin();
try {
- if (!bulkApiLogin()) {
- throw new IllegalArgumentException("Invalid Login");
- }
BulkConnection connection = this.bulkConnection;
JobInfo jobRequest = new JobInfo();
jobRequest.setObject(entity);
@@ -798,8 +781,8 @@ public class SalesforceExtractor extends RestApiExtractor {
log.error("Bulk batch failed: " + batchResponse.toString());
throw new Exception("Failed to get bulk batch info for jobId " + jobId + " error - " + batchResponse.getStateMessage());
}
- JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList = retrievePkChunkingResultIds(connection, jobId, waitMilliSecond);
- return jobIdAndBatchIdResultIdList;
+ ResultFileIdsStruct resultFileIdsStruct = retrievePkChunkingResultIds(connection, jobId, waitMilliSecond);
+ return resultFileIdsStruct;
} catch (Exception e) {
throw new RuntimeException("getQueryResultIdsPkChunking: error - " + e.getMessage(), e);
}
@@ -812,10 +795,7 @@ public class SalesforceExtractor extends RestApiExtractor {
* @return iterator with batch of records
*/
private List<BatchIdAndResultId> getQueryResultIds(String entity, List<Predicate> predicateList) throws Exception {
- if (!bulkApiLogin()) {
- throw new IllegalArgumentException("Invalid Login");
- }
-
+ bulkApiLogin();
try {
// Set bulk job attributes
this.bulkJob.setObject(entity);
@@ -879,7 +859,6 @@ public class SalesforceExtractor extends RestApiExtractor {
for (BatchInfo bi : batchInfoList.getBatchInfo()) {
QueryResultList list = this.bulkConnection.getQueryResultList(this.bulkJob.getId(), bi.getId());
-
for (String result : list.getResult()) {
batchIdAndResultIdList.add(new BatchIdAndResultId(bi.getId(), result));
}
@@ -895,176 +874,6 @@ public class SalesforceExtractor extends RestApiExtractor {
}
}
-
- /**
- * Get a buffered reader wrapping the query result stream for the result with the specified index
- * @param index index the {@link #bulkResultIdList}
- * @return a {@link BufferedReader}
- * @throws AsyncApiException
- */
- private BufferedReader getBulkBufferedReader(int index) throws AsyncApiException {
- return new BufferedReader(new InputStreamReader(
- this.bulkConnection.getQueryResultStream(this.bulkJob.getId(), this.bulkResultIdList.get(index).getBatchId(),
- this.bulkResultIdList.get(index).getResultId()), ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
- }
-
- /**
- * Fetch records into a {@link RecordSetList} up to the configured batch size {@link #batchSize}. This batch is not
- * the entire Salesforce result batch. It is an internal batch in the extractor for buffering a subset of the result
- * stream that comes from a Salesforce batch for more efficient processing.
- * @param rs the record set to fetch into
- * @param initialRecordCount Initial record count to use. This should correspond to the number of records already in rs.
- * This is used to limit the number of records returned in rs to {@link #batchSize}.
- * @throws DataRecordException
- * @throws IOException
- */
- private void fetchResultBatch(RecordSetList<JsonElement> rs, int initialRecordCount)
- throws DataRecordException, IOException {
- int recordCount = initialRecordCount;
-
- // Stream the resultset through CSV reader to identify columns in each record
- InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);
-
- // Get header if it is first run of a new resultset
- if (this.isNewBulkResultSet()) {
- this.bulkRecordHeader = reader.nextRecord();
- this.bulkResultColumCount = this.bulkRecordHeader.size();
- this.setNewBulkResultSet(false);
- }
-
- // Get record from CSV reader stream
- while ((this.csvRecord = reader.nextRecord()) != null) {
- // Convert CSV record to JsonObject
- JsonObject jsonObject = Utils.csvToJsonObject(this.bulkRecordHeader, this.csvRecord, this.bulkResultColumCount);
- rs.add(jsonObject);
- recordCount++;
- this.bulkRecordCount++;
-
- // Insert records in record set until it reaches the batch size
- if (recordCount >= batchSize) {
- log.info("Total number of records processed so far: " + this.bulkRecordCount);
- break;
- }
- }
- }
-
- /**
- * Reinitialize the state of {@link #bulkBufferedReader} to handle network disconnects
- * @throws IOException
- * @throws AsyncApiException
- */
- private void reinitializeBufferedReader() throws IOException, AsyncApiException {
- // close reader and get a new input stream to reconnect to resolve intermittent network errors
- this.bulkBufferedReader.close();
- this.bulkBufferedReader = getBulkBufferedReader(this.bulkResultIdCount - 1);
-
- // if the result set is partially processed then we need to skip over processed records
- if (!isNewBulkResultSet()) {
- List<String> lastCsvRecord = null;
- InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);
-
- // skip header
- reader.nextRecord();
-
- int recordsToSkip = this.bulkRecordCount - this.prevBulkRecordCount;
- log.info("Skipping {} records on retry: ", recordsToSkip);
-
- for (int i = 0; i < recordsToSkip; i++) {
- lastCsvRecord = reader.nextRecord();
- }
-
- // make sure the last record processed before the error was the last record skipped so that the next
- // unprocessed record is processed in the next call to fetchResultBatch()
- if (recordsToSkip > 0) {
- if (!this.csvRecord.equals(lastCsvRecord)) {
- throw new RuntimeException("Repositioning after reconnecting did not point to the expected record");
- }
- }
- }
- }
-
-
-
- /**
- * Fetch a result batch with retry for network errors
- * @param rs the {@link RecordSetList} to fetch into
- */
- private void fetchResultBatchWithRetry(RecordSetList<JsonElement> rs)
- throws AsyncApiException, DataRecordException, IOException {
- boolean success = false;
- int retryCount = 0;
- int recordCountBeforeFetch = this.bulkRecordCount;
-
- do {
- try {
- // reinitialize the reader to establish a new connection to handle transient network errors
- if (retryCount > 0) {
- reinitializeBufferedReader();
- }
-
- // on retries there may already be records in rs, so pass the number of records as the initial count
- fetchResultBatch(rs, this.bulkRecordCount - recordCountBeforeFetch);
- success = true;
- } catch (IOException e) {
- if (retryCount < this.fetchRetryLimit) {
- log.info("Exception while fetching data, retrying: " + e.getMessage(), e);
- retryCount++;
- } else {
- log.error("Exception while fetching data: " + e.getMessage(), e);
- throw e;
- }
- }
- } while (!success);
- }
-
- /**
- * Get data from the bulk api input stream
- * @return record set with each record as a JsonObject
- */
- private RecordSet<JsonElement> getBulkData() throws DataRecordException {
- log.debug("Processing bulk api batch...");
- RecordSetList<JsonElement> rs = new RecordSetList<>();
-
- try {
- // if Buffer is empty then get stream for the new resultset id
- if (this.bulkBufferedReader == null || !this.bulkBufferedReader.ready()) {
-
- // log the number of records from each result set after it is processed (bulkResultIdCount > 0)
- if (this.bulkResultIdCount > 0) {
- log.info("Result set {} had {} records", this.bulkResultIdCount,
- this.bulkRecordCount - this.prevBulkRecordCount);
- }
-
- // if there is unprocessed resultset id then get result stream for that id
- if (this.bulkResultIdCount < this.bulkResultIdList.size()) {
- log.info("Stream resultset for resultId:" + this.bulkResultIdList.get(this.bulkResultIdCount));
- this.setNewBulkResultSet(true);
-
- if (this.bulkBufferedReader != null) {
- this.bulkBufferedReader.close();
- }
-
- this.bulkBufferedReader = getBulkBufferedReader(this.bulkResultIdCount);
- this.bulkResultIdCount++;
- this.prevBulkRecordCount = bulkRecordCount;
- } else {
- // if result stream processed for all resultset ids then finish the bulk job
- log.info("Bulk job is finished");
- this.setBulkJobFinished(true);
- return rs;
- }
- }
-
- // fetch a batch of results with retry for network errors
- fetchResultBatchWithRetry(rs);
-
- } catch (Exception e) {
- throw new DataRecordException("Failed to get records from salesforce; error - " + e.getMessage(), e);
- }
-
- return rs;
- }
-
@Override
public void closeConnection() throws Exception {
if (this.bulkConnection != null
@@ -1078,12 +887,11 @@ public class SalesforceExtractor extends RestApiExtractor {
return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), RestApiCommandType.GET));
}
-
- private JobIdAndBatchIdResultIdList retrievePkChunkingResultIdsByBatchId(BulkConnection connection, String jobId, String batchIdListStr) {
+ private ResultFileIdsStruct retrievePkChunkingResultIdsByBatchId(BulkConnection connection, String jobId, String batchIdListStr) {
Iterator<String> batchIds = Arrays.stream(batchIdListStr.split(",")).map(x -> x.trim()).filter(x -> !x.equals("")).iterator();
try {
List<BatchIdAndResultId> batchIdAndResultIdList = fetchBatchResultIds(connection, jobId, batchIds);
- return new JobIdAndBatchIdResultIdList(jobId, batchIdAndResultIdList);
+ return new ResultFileIdsStruct(jobId, batchIdAndResultIdList);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -1092,7 +900,7 @@ public class SalesforceExtractor extends RestApiExtractor {
/**
* Waits for the PK batches to complete. The wait will stop after all batches are complete or on the first failed batch
*/
- private JobIdAndBatchIdResultIdList retrievePkChunkingResultIds(BulkConnection connection, String jobId, int waitMilliSecond) {
+ private ResultFileIdsStruct retrievePkChunkingResultIds(BulkConnection connection, String jobId, int waitMilliSecond) {
log.info("Waiting for completion of the the bulk job [jobId={}])'s sub queries.", jobId);
try {
while (true) {
@@ -1107,7 +915,7 @@ public class SalesforceExtractor extends RestApiExtractor {
Stream<BatchInfo> stream = Arrays.stream(batchInfos);
Iterator<String> batchIds = stream.filter(x -> x.getNumberRecordsProcessed() != 0).map(x -> x.getId()).iterator();
List<BatchIdAndResultId> batchIdAndResultIdList = fetchBatchResultIds(connection, jobId, batchIds);
- return new JobIdAndBatchIdResultIdList(jobId, batchIdAndResultIdList);
+ return new ResultFileIdsStruct(jobId, batchIdAndResultIdList);
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -1177,7 +985,7 @@ public class SalesforceExtractor extends RestApiExtractor {
}
@Data
- public static class JobIdAndBatchIdResultIdList {
+ public static class ResultFileIdsStruct {
private final String jobId;
private final List<BatchIdAndResultId> batchIdAndResultIdList;
}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index 96ad4b7..637df52 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -79,7 +79,7 @@ import lombok.extern.slf4j.Slf4j;
import static org.apache.gobblin.configuration.ConfigurationKeys.*;
import static org.apache.gobblin.salesforce.SalesforceConfigurationKeys.*;
-import org.apache.gobblin.salesforce.SalesforceExtractor.JobIdAndBatchIdResultIdList;
+
import org.apache.gobblin.salesforce.SalesforceExtractor.BatchIdAndResultId;
/**
@@ -171,11 +171,11 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
* generate workUnit for pk chunking
*/
private List<WorkUnit> generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) {
- JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList = executeQueryWithPkChunking(state, previousWatermark);
- return createWorkUnits(sourceEntity, state, jobIdAndBatchIdResultIdList);
+ SalesforceExtractor.ResultFileIdsStruct resultFileIdsStruct = executeQueryWithPkChunking(state, previousWatermark);
+ return createWorkUnits(sourceEntity, state, resultFileIdsStruct);
}
- private JobIdAndBatchIdResultIdList executeQueryWithPkChunking(
+ private SalesforceExtractor.ResultFileIdsStruct executeQueryWithPkChunking(
SourceState sourceState,
long previousWatermark
) throws RuntimeException {
@@ -204,10 +204,10 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
List<Predicate> predicateList = Arrays.asList(predicate);
String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY);
- if (state.contains(PK_CHUNKING_TEST_JOB_ID)) {
- String jobId = state.getProp(PK_CHUNKING_TEST_JOB_ID, "");
+ if (state.contains(BULK_TEST_JOB_ID)) {
+ String jobId = state.getProp(BULK_TEST_JOB_ID, "");
log.info("---Skip query, fetching result files directly for [jobId={}]", jobId);
- String batchIdListStr = state.getProp(PK_CHUNKING_TEST_BATCH_ID_LIST);
+ String batchIdListStr = state.getProp(BULK_TEST_BATCH_ID_LIST);
return salesforceExtractor.getQueryResultIdsPkChunkingFetchOnly(jobId, batchIdListStr);
} else {
log.info("---Pk Chunking query submit.");
@@ -226,7 +226,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
private List<WorkUnit> createWorkUnits(
SourceEntity sourceEntity,
SourceState state,
- JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList
+ SalesforceExtractor.ResultFileIdsStruct resultFileIdsStruct
) {
String nameSpaceName = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY);
Extract.TableType tableType = Extract.TableType.valueOf(state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY).toUpperCase());
@@ -235,7 +235,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
List<WorkUnit> workUnits = Lists.newArrayList();
int partitionNumber = state.getPropAsInt(SOURCE_MAX_NUMBER_OF_PARTITIONS, 1);
- List<BatchIdAndResultId> batchResultIds = jobIdAndBatchIdResultIdList.getBatchIdAndResultIdList();
+ List<BatchIdAndResultId> batchResultIds = resultFileIdsStruct.getBatchIdAndResultIdList();
int total = batchResultIds.size();
// size of every partition should be: math.ceil(total/partitionNumber), use simpler way: (total+partitionNumber-1)/partitionNumber
@@ -245,10 +245,10 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
for (List<BatchIdAndResultId> resultIds : partitionedResultIds) {
WorkUnit workunit = new WorkUnit(extract);
- String bulkJobId = jobIdAndBatchIdResultIdList.getJobId();
+ String bulkJobId = resultFileIdsStruct.getJobId();
workunit.setProp(PK_CHUNKING_JOB_ID, bulkJobId);
String resultIdStr = resultIds.stream().map(x -> x.getBatchId() + ":" + x.getResultId()).collect(Collectors.joining(","));
- workunit.setProp(PK_CHUNKING_BATCH_RESULT_IDS, resultIdStr);
+ workunit.setProp(PK_CHUNKING_BATCH_RESULT_ID_PAIRS, resultIdStr);
workunit.setProp(ConfigurationKeys.SOURCE_ENTITY, sourceEntity.getSourceEntityName());
workunit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, sourceEntity.getDestTableName());
workunit.setProp(WORK_UNIT_STATE_VERSION_KEY, CURRENT_WORK_UNIT_STATE_VERSION);
@@ -771,4 +771,3 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
private int probeCount = 0;
}
}
-