You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2020/01/30 01:30:23 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1025] Add retry for PK-Chuking iterator[]

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 29cc4bc  [GOBBLIN-1025] Add retry for PK-Chuking iterator[]
29cc4bc is described below

commit 29cc4bc24c45e148bcab36d3e3bd6a71c69d1d21
Author: Alex Li <al...@linkedin.com>
AuthorDate: Wed Jan 29 17:30:16 2020 -0800

    [GOBBLIN-1025] Add retry for PK-Chuking iterator[]
    
    Closes #2868 from arekusuri/master
---
 .gitignore                                         |   4 +
 .../extractor/utils/InputStreamCSVReader.java      |  10 +
 .../gobblin/salesforce/BulkResultIterator.java     | 146 ++++++++
 .../org/apache/gobblin/salesforce/FileIdVO.java    |  34 ++
 .../gobblin/salesforce/QueryResultIterator.java    |  95 ++++++
 .../gobblin/salesforce/ResultChainingIterator.java |  73 ++++
 .../apache/gobblin/salesforce/ResultIterator.java  | 166 ----------
 .../salesforce/SalesforceConfigurationKeys.java    |  13 +-
 .../gobblin/salesforce/SalesforceExtractor.java    | 366 +++++----------------
 .../gobblin/salesforce/SalesforceSource.java       |  23 +-
 10 files changed, 467 insertions(+), 463 deletions(-)

diff --git a/.gitignore b/.gitignore
index 5195ebb..ca034f5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -73,3 +73,7 @@ gobblin-modules/gobblin-elasticsearch/test-elasticsearch/
 
 temp/
 ligradle/*
+FsDatasetStateStoreTest/
+GobblinHelixTaskTest/
+commit-sequence-store-test/
+gobblin-test-harness/src/test/resources/runtime_test/state_store/
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/utils/InputStreamCSVReader.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/utils/InputStreamCSVReader.java
index 1aa0ed5..beea504 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/utils/InputStreamCSVReader.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/utils/InputStreamCSVReader.java
@@ -42,6 +42,8 @@ public class InputStreamCSVReader {
   private int maxFieldCount;
   private boolean atEOF;
 
+  private BufferedReader bufferedReader;
+
   public InputStreamCSVReader(Reader input) {
     this(new BufferedReader(input));
   }
@@ -90,6 +92,7 @@ public class InputStreamCSVReader {
   }
 
   public InputStreamCSVReader(BufferedReader input, char separator, char enclosedChar) {
+    this.bufferedReader = input;
     this.separator = separator;
     // parser settings for the separator and escape chars
     this.parser = new StreamTokenizer(input);
@@ -259,4 +262,11 @@ public class InputStreamCSVReader {
       return this.recordNumber;
     }
   }
+
+  /**
+   * close the bufferedReader
+   */
+  public void close() throws IOException {
+    this.bufferedReader.close();
+  }
 }
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
new file mode 100644
index 0000000..4178193
--- /dev/null
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import com.sforce.async.BulkConnection;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+/**
+ * Iterator for fetching result file of Bulk API.
+ */
+@Slf4j
+public class BulkResultIterator implements Iterator<JsonElement> {
+  private FileIdVO fileIdVO;
+  private int retryLimit;
+  private BulkConnection conn;
+  private InputStreamCSVReader csvReader;
+  private List<String> header;
+  private int columnSize;
+  private int lineCount = 0; // this is different than currentFileRowCount. cvs file has header
+  private List<String> preLoadedLine = null;
+
+  public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int retryLimit) {
+    log.info("create BulkResultIterator: " + fileIdVO);
+    this.conn = conn;
+    this.fileIdVO = fileIdVO;
+    this.retryLimit = retryLimit;
+  }
+
+  /**
+   * read first data record from cvsReader and initiate header
+   * not supposed to do it in constructor function, for delay creating file stream
+   */
+  private void initHeader() {
+    this.header = this.nextLineWithRetry(); // first line is header
+    this.columnSize = this.header.size();
+    this.preLoadedLine = this.nextLineWithRetry(); // initialize: buffer one record data
+  }
+
+  private List<String> nextLineWithRetry() {
+    Exception exception = null;
+    for (int i = 0; i < retryLimit; i++) {
+      try {
+        if (this.csvReader == null) {
+          this.csvReader = openAndSeekCsvReader(null);
+        }
+        List<String> line = this.csvReader.nextRecord();
+        this.lineCount++;
+        return line;
+      } catch (InputStreamCSVReader.CSVParseException e) {
+        throw new RuntimeException(e); // don't retry if it is parse error
+      } catch (Exception e) { // if it is any other exception, retry may resolve the issue.
+        exception = e;
+        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = openAndSeekCsvReader(e);
+      }
+    }
+    throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + " times - ", exception);
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (this.header == null) {
+      initHeader();
+    }
+    return this.preLoadedLine != null;
+  }
+
+  @Override
+  public JsonElement next() {
+    if (this.header == null) {
+      initHeader();
+    }
+    JsonElement jsonObject = Utils.csvToJsonObject(this.header, this.preLoadedLine, this.columnSize);
+    this.preLoadedLine = this.nextLineWithRetry();
+    if (this.preLoadedLine == null) {
+      log.info("----Record count: [{}] for {}", getRowCount(), fileIdVO);
+    }
+    return jsonObject;
+  }
+
+  private InputStreamCSVReader openAndSeekCsvReader(Exception exceptionRetryFor) {
+    String jobId = fileIdVO.getJobId();
+    String batchId = fileIdVO.getBatchId();
+    String resultId = fileIdVO.getResultId();
+    log.info("Fetching [jobId={}, batchId={}, resultId={}]", jobId, batchId, resultId);
+    closeCsvReader();
+    try {
+      InputStream is = conn.getQueryResultStream(jobId, batchId, resultId);
+      BufferedReader br = new BufferedReader(new InputStreamReader(is, ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+      csvReader = new InputStreamCSVReader(br);
+      List<String> lastSkippedLine = null;
+      for (int j = 0; j < lineCount; j++) {
+        lastSkippedLine = csvReader.nextRecord(); // skip these records
+      }
+      if ((lastSkippedLine == null && preLoadedLine != null) || (lastSkippedLine != null && !lastSkippedLine.equals(preLoadedLine))) {
+        // check if last skipped line is same as the line before error
+        throw new RuntimeException("Failed to verify last skipped line - retrying for =>", exceptionRetryFor);
+      }
+      return csvReader;
+    } catch (Exception e) { // failed to open reader and skip lineCount lines
+      exceptionRetryFor = exceptionRetryFor != null? exceptionRetryFor : e;
+      throw new RuntimeException("Failed to [" + e.getMessage() + "] - retrying for => " , exceptionRetryFor);
+    }
+  }
+
+  private int getRowCount() {
+    // first line is header, last line is `null`,
+    // because cvsReader doesn't have hasNext to check end of the stream, we will get null as last line
+    return lineCount - 2;
+  }
+
+  private void closeCsvReader() {
+    if (this.csvReader != null) {
+      try {
+        this.csvReader.close();
+      } catch (IOException e) {
+        // ignore the exception
+      }
+    }
+  }
+}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/FileIdVO.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/FileIdVO.java
new file mode 100644
index 0000000..0eb2fd3
--- /dev/null
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/FileIdVO.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import lombok.Data;
+
+
+/**
+ * FileIdVO class
+ */
+@Data
+public class FileIdVO {
+  private final String jobId;
+  private final String batchId;
+  private final String resultId;
+  public String toString() {
+    return String.format("[jobId=%s, batchId=%s, resultId=%s]", jobId, batchId, resultId);
+  }
+}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
new file mode 100644
index 0000000..56c76ea
--- /dev/null
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/QueryResultIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Iterator for rest api query
+ * It is a wrapper of
+ * RestApiExtractor.getRecordSet(schema, entity, workUnit, predicateList)
+ * the reason why we want to a wrapper for the function is -
+ * We want to delay the execution of the function. Only when the next get called, we fetch the data.
+ */
+@Slf4j
+public class QueryResultIterator implements Iterator<JsonElement> {
+
+  private int recordCount = 0;
+  private SalesforceExtractor extractor;
+  private String schema;
+  private String entity;
+  private WorkUnit workUnit;
+  private List<Predicate> predicateList;
+
+  private Iterator<JsonElement> queryResultIter;
+
+  public QueryResultIterator(
+      SalesforceExtractor extractor,
+      String schema,
+      String entity,
+      WorkUnit workUnit,
+      List<Predicate> predicateList
+  ) {
+    log.info("create query result iterator.");
+    this.extractor = extractor;
+    this.schema = schema;
+    this.entity = entity;
+    this.workUnit = workUnit;
+    this.predicateList = predicateList;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (queryResultIter == null) {
+      initQueryResultIter();
+    }
+    if (!queryResultIter.hasNext()) {
+      // no more data, print out total
+      log.info("Rest API query records total:{}", recordCount);
+    }
+    return queryResultIter.hasNext();
+  }
+
+  private void initQueryResultIter() {
+    try {
+      queryResultIter = extractor.getRecordSet(schema, entity, workUnit, predicateList);
+    } catch (DataRecordException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public JsonElement next() {
+    if (queryResultIter == null) {
+      initQueryResultIter();
+    }
+    recordCount ++;
+    if (!queryResultIter.hasNext()) {
+      // no more data, print out total
+      log.info("Rest API query records total:{}", recordCount);
+    }
+    return queryResultIter.next();
+  }
+}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java
new file mode 100644
index 0000000..0ab3629
--- /dev/null
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultChainingIterator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.salesforce;
+
+import com.google.common.collect.Iterators;
+import com.google.gson.JsonElement;
+import com.sforce.async.BulkConnection;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * The Iterator to chain all result iterators together.
+ * It is to create only one iterator for a list of result files of BulkAPI.
+ * Same time it can also be able to add other iterator with function `add` to combine to 1 iterator
+ */
+@Slf4j
+public class ResultChainingIterator implements Iterator<JsonElement> {
+  private Iterator<JsonElement> iter;
+  private int recordCount = 0;
+
+  public ResultChainingIterator(BulkConnection conn, List<FileIdVO> fileIdList, int retryLimit) {
+    Iterator<BulkResultIterator> iterOfFiles = fileIdList.stream().map(x -> new BulkResultIterator(conn, x, retryLimit)).iterator();
+    iter = Iterators.<JsonElement>concat(iterOfFiles);
+  }
+
+  public Iterator<JsonElement> get() {
+    return iter;
+  }
+
+  public void add(Iterator<JsonElement> iter) {
+    this.iter = Iterators.concat(this.iter, iter);
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!iter.hasNext()) {
+      // hasNext is false, means all data in this iterator was fetched
+      // we can print out record total.
+      log.info("====Total records: [{}] ====", recordCount);
+    }
+    return iter.hasNext();
+  }
+
+  @Override
+  public JsonElement next() {
+    JsonElement jsonElement = iter.next();
+    recordCount ++;
+    if (!iter.hasNext()) {
+      // see hasNext.
+      //In case caller may not check hasNext and use next() == null as end of the interator
+      // we can print out total here.
+      log.info("====Total records: [{}] ====", recordCount);
+    }
+    return jsonElement;
+  }
+}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java
deleted file mode 100644
index d6b5566..0000000
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.salesforce;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.sforce.async.BulkConnection;
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
-import org.apache.gobblin.source.extractor.utils.Utils;
-
-
-/**
- * Result Iterator.
- * Take jobId and 'batchId:resultId,batchId2:resultId2' as input build a result record iterator.
- */
-@Slf4j
-public class ResultIterator implements Iterator {
-  private Iterator<ResultStruct> batchIdResultIdIterator;
-  private BulkConnection bulkConnection;
-  private InputStreamCSVReader csvReader;
-  private List<String> csvHeader;
-  private int columnSize;
-  private int urlLoadRetryLimit;
-  private ResultStruct resultStruct;
-  private List<String> currentRecord = null;
-  private Boolean isLoadedCurrentRecord = false;
-  private int currentFileRowCount = 0;
-  private int totalRowCount = 0;
-
-  /**
-   *  constructor
-   *  need to initiate the reader and currentRecord
-   */
-  public ResultIterator(BulkConnection bulkConnection, String jobId, String batchIdResultIdString, int urlLoadRetryLimit) {
-    this.urlLoadRetryLimit = urlLoadRetryLimit;
-    this.bulkConnection = bulkConnection;
-    this.batchIdResultIdIterator = this.parsebatchIdResultIdString(jobId, batchIdResultIdString);
-    if (this.batchIdResultIdIterator.hasNext()) {
-      this.resultStruct = this.batchIdResultIdIterator.next();
-      this.csvReader = this.fetchResultsetAsCsvReader(this.resultStruct); // first file reader
-    } else {
-      throw new RuntimeException("No batch-result id found.");
-    }
-    this.fulfillCurrentRecord();
-    this.csvHeader = this.currentRecord;
-    this.columnSize = this.csvHeader.size();
-    // after fetching cvs header, clean up status
-    this.resetCurrentRecordStatus();
-  }
-
-  /**
-   * call reader.next and set up currentRecord
-   */
-  private void fulfillCurrentRecord() {
-    if (this.isLoadedCurrentRecord) {
-      return; // skip, since CurrentRecord was loaded.
-    }
-    try {
-      this.currentRecord = this.csvReader.nextRecord();
-      if (this.currentRecord == null) { // according InputStreamCSVReader, it returns null at the end of the reader.
-        log.info("Fetched {} - rows: {}", this.resultStruct, this.currentFileRowCount); // print out log before switch result file
-        this.currentFileRowCount = 0; // clean up
-        if (this.batchIdResultIdIterator.hasNext()) { // if there is next file, load next file.
-          this.resultStruct = this.batchIdResultIdIterator.next();
-          this.csvReader = this.fetchResultsetAsCsvReader(resultStruct);
-          this.csvReader.nextRecord(); // read and ignore the csv header.
-          this.currentRecord = this.csvReader.nextRecord();
-        } else {
-          log.info("---- Fetched {} rows -----", this.totalRowCount); // print out log when all records were fetched.
-        }
-      }
-      this.isLoadedCurrentRecord = true;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void resetCurrentRecordStatus() {
-    this.currentRecord = null;
-    this.isLoadedCurrentRecord = false;
-  }
-
-  @Override
-  public boolean hasNext() {
-    this.fulfillCurrentRecord();
-    return this.currentRecord != null;
-  }
-
-  @Override
-  public JsonElement next() {
-    this.fulfillCurrentRecord();
-    List<String> csvRecord = this.currentRecord;
-    this.resetCurrentRecordStatus();
-    if (csvRecord == null) {
-      throw new NoSuchElementException();
-    }
-    this.currentFileRowCount++;
-    this.totalRowCount++;
-    JsonObject jsonObject = Utils.csvToJsonObject(this.csvHeader, csvRecord, this.columnSize);
-    return jsonObject;
-  }
-
-  /**
-   * resultStruct has all data which identify a result set file
-   * fetch it and convert to a csvReader
-   */
-  private InputStreamCSVReader fetchResultsetAsCsvReader(ResultStruct resultStruct) {
-    String jobId = resultStruct.jobId;
-    String batchId = resultStruct.batchId;
-    String resultId = resultStruct.resultId;
-    log.info("PK-Chunking workunit: fetching [jobId={}, batchId={}, resultId={}]", jobId, batchId, resultId);
-    for (int i = 0; i < this.urlLoadRetryLimit; i++) { // retries
-      try {
-        InputStream is = this.bulkConnection.getQueryResultStream(jobId, batchId, resultId);
-        BufferedReader br = new BufferedReader(new InputStreamReader(is, ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
-        return new InputStreamCSVReader(br);
-      } catch (Exception e) { // skip, for retry
-      }
-    }
-    // tried fetchRetryLimit times, always getting exception
-    throw new RuntimeException("Tried " + this.urlLoadRetryLimit + " times, but couldn't fetch data.");
-  }
-
-  /**
-   * input string format is "batchId:resultId,batchId2:resultId2"
-   * parse it to iterator
-   */
-  private Iterator<ResultStruct> parsebatchIdResultIdString(String jobId, String batchIdResultIdString) {
-    return Arrays.stream(batchIdResultIdString.split(",")).map(x -> x.split(":")).map(x -> new ResultStruct(jobId, x[0], x[1])).iterator();
-  }
-
-  @Data
-  static class ResultStruct {
-    private final String jobId;
-    private final String batchId;
-    private final String resultId;
-    public String toString() {
-      return String.format("[jobId=%s, batchId=%s, resultId=%s]", jobId, batchId, resultId);
-    }
-  }
-
-}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
index d5fe3f7..6d40421 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
@@ -17,6 +17,9 @@
 
 package org.apache.gobblin.salesforce;
 
+/**
+ * SalesforceConfigurationKeys
+ */
 public final class SalesforceConfigurationKeys {
   private SalesforceConfigurationKeys() {
   }
@@ -26,15 +29,13 @@ public final class SalesforceConfigurationKeys {
   public static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll";
 
   // pk-chunking
-  public static final String PK_CHUNKING_TEST_BATCH_ID_LIST = "salesforce.pkChunking.testBatchIdList";
-  public static final String PK_CHUNKING_TEST_JOB_ID = "salesforce.pkChunking.testJobId";
+  public static final String BULK_TEST_JOB_ID = "salesforce.bulk.testJobId";
+  public static final String BULK_TEST_BATCH_ID_LIST = "salesforce.bulk.testBatchIds";
   public static final String SALESFORCE_PARTITION_TYPE = "salesforce.partitionType";
   public static final String PARTITION_PK_CHUNKING_SIZE = "salesforce.partition.pkChunkingSize";
-  public static final String PK_CHUNKING_JOB_ID = "_salesforce.job.id";
-  public static final String PK_CHUNKING_BATCH_RESULT_IDS = "_salesforce.result.ids";
+  public static final String PK_CHUNKING_JOB_ID = "__salesforce.job.id"; // don't use in ini config
+  public static final String PK_CHUNKING_BATCH_RESULT_ID_PAIRS = "__salesforce.batch.result.id.pairs"; // don't use in ini config
   public static final int MAX_PK_CHUNKING_SIZE = 250_000; // this number is from SFDC's doc - https://tinyurl.com/ycjvgwv2
   public static final int MIN_PK_CHUNKING_SIZE = 20_000;
   public static final int DEFAULT_PK_CHUNKING_SIZE = 250_000; // default to max for saving request quota
 }
-
-
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 36186e5..4582b0b 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -18,11 +18,8 @@
 package org.apache.gobblin.salesforce;
 
 import com.google.common.collect.Iterators;
-import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.io.FileNotFoundException;
-import java.io.InputStreamReader;
 import java.net.URI;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -34,7 +31,9 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.commons.lang.StringUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.NameValuePair;
 import org.apache.http.client.methods.HttpGet;
@@ -77,10 +76,7 @@ import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
 import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand.RestApiCommandType;
 import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector;
 import org.apache.gobblin.source.extractor.extract.restapi.RestApiExtractor;
-import org.apache.gobblin.source.extractor.resultset.RecordSet;
-import org.apache.gobblin.source.extractor.resultset.RecordSetList;
 import org.apache.gobblin.source.extractor.schema.Schema;
-import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
 import org.apache.gobblin.source.extractor.utils.Utils;
 import org.apache.gobblin.source.extractor.watermark.Predicate;
 import org.apache.gobblin.source.extractor.watermark.WatermarkType;
@@ -91,7 +87,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import static org.apache.gobblin.salesforce.SalesforceConfigurationKeys.*;
 
-
 /**
  * An implementation of salesforce extractor for extracting data from SFDC
  */
@@ -108,28 +103,18 @@ public class SalesforceExtractor extends RestApiExtractor {
   private static final String FETCH_RETRY_LIMIT_KEY = "salesforce.fetchRetryLimit";
   private static final boolean DEFAULT_BULK_API_USE_QUERY_ALL = false;
 
-
   private boolean pullStatus = true;
   private String nextUrl;
 
   private BulkConnection bulkConnection = null;
-  private boolean bulkApiInitialRun = true;
   private JobInfo bulkJob = new JobInfo();
-  private BufferedReader bulkBufferedReader = null;
-  private List<BatchIdAndResultId> bulkResultIdList = Lists.newArrayList();
-  private int bulkResultIdCount = 0;
+  private List<BatchIdAndResultId> bulkResultIdList;
   private boolean bulkJobFinished = true;
-  private List<String> bulkRecordHeader;
-  private int bulkResultColumCount;
   private boolean newBulkResultSet = true;
-  private int bulkRecordCount = 0;
-  private int prevBulkRecordCount = 0;
-  private List<String> csvRecord;
 
   private final int pkChunkingSize;
   private final SalesforceConnector sfConnector;
-  private final int fetchRetryLimit;
-  private final int batchSize;
+  private final int retryLimit;
 
   private final boolean bulkApiUseQueryAll;
 
@@ -142,13 +127,7 @@ public class SalesforceExtractor extends RestApiExtractor {
             Math.min(MAX_PK_CHUNKING_SIZE, workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE)));
 
     this.bulkApiUseQueryAll = workUnitState.getPropAsBoolean(BULK_API_USE_QUERY_ALL, DEFAULT_BULK_API_USE_QUERY_ALL);
-
-    // Get batch size from .pull file
-    int tmpBatchSize = workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE,
-        ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE);
-
-    this.batchSize = tmpBatchSize == 0 ? ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE : tmpBatchSize;
-    this.fetchRetryLimit = workUnitState.getPropAsInt(FETCH_RETRY_LIMIT_KEY, DEFAULT_FETCH_RETRY_LIMIT);
+    this.retryLimit = workUnitState.getPropAsInt(FETCH_RETRY_LIMIT_KEY, DEFAULT_FETCH_RETRY_LIMIT);
   }
 
   @Override
@@ -563,75 +542,63 @@ public class SalesforceExtractor extends RestApiExtractor {
     return dataTypeMap;
   }
 
-
   private Boolean isPkChunkingFetchDone = false;
 
-  private Iterator<JsonElement> getRecordSetPkChunking(WorkUnit workUnit) throws RuntimeException {
+  private Iterator<JsonElement> fetchRecordSetPkChunking(WorkUnit workUnit) {
     if (isPkChunkingFetchDone) {
       return null; // must return null to represent no more data.
     }
+    log.info("----Get records for pk-chunking----" + workUnit.getProp(PK_CHUNKING_JOB_ID));
     isPkChunkingFetchDone = true; // set to true, never come here twice.
+    bulkApiLogin();
+    String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
+    String batchIdResultIdPairString = workUnit.getProp(PK_CHUNKING_BATCH_RESULT_ID_PAIRS);
+    List<FileIdVO> fileIdList = this.parseBatchIdResultIdString(jobId, batchIdResultIdPairString);
+    return new ResultChainingIterator(bulkConnection, fileIdList, retryLimit);
+  }
 
+  private List<FileIdVO> parseBatchIdResultIdString(String jobId, String batchIdResultIdString) {
+    return Arrays.stream(batchIdResultIdString.split(","))
+        .map( x -> x.split(":")).map(x -> new FileIdVO(jobId, x[0], x[1]))
+        .collect(Collectors.toList());
+  }
+
+  private Boolean isBulkFetchDone = false;
+
+  private Iterator<JsonElement> fetchRecordSet(
+      String schema,
+      String entity,
+      WorkUnit workUnit,
+      List<Predicate> predicateList
+) {
+    if (isBulkFetchDone) {
+      return null; // need to return null to indicate no more data.
+    }
+    isBulkFetchDone = true;
+    log.info("----Get records for bulk batch job----");
     try {
-      if (!bulkApiLogin()) {
-        throw new IllegalArgumentException("Invalid Login");
-      }
+      // set finish status to false before starting the bulk job
+      this.setBulkJobFinished(false);
+      this.bulkResultIdList = getQueryResultIds(entity, predicateList);
+      log.info("Number of bulk api resultSet Ids:" + this.bulkResultIdList.size());
+      List<FileIdVO> fileIdVoList = this.bulkResultIdList.stream()
+          .map(x -> new FileIdVO(this.bulkJob.getId(), x.batchId, x.resultId))
+          .collect(Collectors.toList());
+      ResultChainingIterator chainingIter = new ResultChainingIterator(this.bulkConnection, fileIdVoList, this.retryLimit);
+      chainingIter.add(getSoftDeletedRecords(schema, entity, workUnit, predicateList));
+      return chainingIter;
     } catch (Exception e) {
-      throw new RuntimeException(e);
+      throw new RuntimeException("Failed to get records using bulk api; error - " + e.getMessage(), e);
     }
-
-    String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
-    String batchIdResultIdString = workUnit.getProp(PK_CHUNKING_BATCH_RESULT_IDS);
-    return new ResultIterator(bulkConnection, jobId, batchIdResultIdString, fetchRetryLimit);
   }
 
   @Override
-  public Iterator<JsonElement> getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit,
-      List<Predicate> predicateList) throws IOException {
+  public Iterator<JsonElement> getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) {
     log.debug("Getting salesforce data using bulk api");
-
-    // new version of extractor: bulk api with pk-chunking in pre-partitioning of SalesforceSource
     if (workUnit.contains(PK_CHUNKING_JOB_ID)) {
-      log.info("----pk-chunking get record set----" + workUnit.getProp(PK_CHUNKING_JOB_ID));
-      return getRecordSetPkChunking(workUnit);
-    }
-    log.info("----bulk get record set----");
-    try {
-      //Get query result ids in the first run
-      //result id is used to construct url while fetching data
-      if (this.bulkApiInitialRun) {
-        // set finish status to false before starting the bulk job
-        this.setBulkJobFinished(false);
-        this.bulkResultIdList = getQueryResultIds(entity, predicateList);
-        log.info("Number of bulk api resultSet Ids:" + this.bulkResultIdList.size());
-      }
-
-      // Get data from input stream
-      // If bulk load is not finished, get data from the stream
-      // Skip empty result sets since they will cause the extractor to terminate early
-      RecordSet<JsonElement> rs = null;
-      while (!this.isBulkJobFinished() && (rs == null || rs.isEmpty())) {
-        rs = getBulkData();
-      }
-
-      // Set bulkApiInitialRun to false after the completion of first run
-      this.bulkApiInitialRun = false;
-
-      // If bulk job is finished, get soft deleted records using Rest API
-      boolean disableSoftDeletePull = this.workUnit.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED);
-      if (rs == null || rs.isEmpty()) { // TODO: Only when the rs is empty, we need to fetch soft deleted records. WHY?
-        // Get soft delete records only if IsDeleted column exists and soft deletes pull is not disabled
-        if (this.columnList.contains("IsDeleted") && !disableSoftDeletePull) {
-          log.info("Pull soft delete records");
-          return this.getSoftDeletedRecords(schema, entity, workUnit, predicateList);
-        }
-        log.info("Ignoring soft delete records");
-      }
-
-      return rs.iterator();
-
-    } catch (Exception e) {
-      throw new IOException("Failed to get records using bulk api; error - " + e.getMessage(), e);
+      return fetchRecordSetPkChunking(workUnit);
+    } else {
+      return fetchRecordSet(schema, entity, workUnit, predicateList);
     }
   }
 
@@ -639,16 +606,31 @@ public class SalesforceExtractor extends RestApiExtractor {
    * Get soft deleted records using Rest Api
    * @return iterator with deleted records
    */
-  private Iterator<JsonElement> getSoftDeletedRecords(String schema, String entity, WorkUnit workUnit,
-      List<Predicate> predicateList) throws DataRecordException {
-    return this.getRecordSet(schema, entity, workUnit, predicateList);
+  private Iterator<JsonElement> getSoftDeletedRecords(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList)
+      throws DataRecordException {
+    boolean disableSoftDeletePull = this.workUnit.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED);
+    if (this.columnList.contains("IsDeleted") && !disableSoftDeletePull) {
+      return new QueryResultIterator(this, schema, entity, workUnit, predicateList);
+    } else {
+      log.info("Ignoring soft delete records");
+      return null;
+    }
   }
 
+  private void bulkApiLogin() {
+    try {
+      if (!doBulkApiLogin()) {
+        throw new RuntimeException("invalid login");
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
   /**
    * Login to salesforce
    * @return login status
    */
-  private boolean bulkApiLogin() throws Exception {
+  private boolean doBulkApiLogin() throws Exception {
     log.info("Authenticating salesforce bulk api");
     boolean success = false;
     String hostName = this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
@@ -709,20 +691,23 @@ public class SalesforceExtractor extends RestApiExtractor {
 
   /**
    * same as getQueryResultIdsPkChunking but the arguments are different.
-   * this function can take existing batch ids to return JobIdAndBatchIdResultIdList
+   * this function can take existing batch ids to return ResultFileIdsStruct
    * It is for test/debug. developers may want to skip execute query on SFDC, use a list of existing batch ids
    */
-  public JobIdAndBatchIdResultIdList getQueryResultIdsPkChunkingFetchOnly(String jobId, String batchIdListStr) {
+  public ResultFileIdsStruct getQueryResultIdsPkChunkingFetchOnly(String jobId, String batchIdListStr) {
+    bulkApiLogin();
     try {
-      if (!bulkApiLogin()) {
-        throw new IllegalArgumentException("Invalid Login");
-      }
       int retryInterval = Math.min(MAX_RETRY_INTERVAL_SECS * 1000, 30 + this.pkChunkingSize  * 2);
-      if (batchIdListStr != null) {
-        log.info("The batchId was specified.");
+      if (StringUtils.isNotEmpty(batchIdListStr)) {
+        log.info("The batchId is specified.");
         return retrievePkChunkingResultIdsByBatchId(this.bulkConnection, jobId, batchIdListStr);
       } else {
-        return retrievePkChunkingResultIds(this.bulkConnection, jobId, retryInterval);
+        ResultFileIdsStruct resultStruct = retrievePkChunkingResultIds(this.bulkConnection, jobId, retryInterval);
+        if (resultStruct.getBatchIdAndResultIdList().isEmpty()) {
+          String msg = String.format("There are no result for the [jobId: %s, batchIds: %s]", jobId, batchIdListStr);
+          throw new RuntimeException(msg);
+        }
+        return resultStruct;
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -741,11 +726,9 @@ public class SalesforceExtractor extends RestApiExtractor {
    *  TODO: abstract this function to a common function: arguments need to add connetion, header, output-format
    *  TODO: make it and its related functions pure function (no side effect). Currently still unnecesarily changing this.bulkJob)
    */
-  public JobIdAndBatchIdResultIdList getQueryResultIdsPkChunking(String entity, List<Predicate> predicateList) {
+  public ResultFileIdsStruct getQueryResultIdsPkChunking(String entity, List<Predicate> predicateList) {
+    bulkApiLogin();
     try {
-      if (!bulkApiLogin()) {
-        throw new IllegalArgumentException("Invalid Login");
-      }
       BulkConnection connection = this.bulkConnection;
       JobInfo jobRequest = new JobInfo();
       jobRequest.setObject(entity);
@@ -798,8 +781,8 @@ public class SalesforceExtractor extends RestApiExtractor {
         log.error("Bulk batch failed: " + batchResponse.toString());
         throw new Exception("Failed to get bulk batch info for jobId " + jobId + " error - " + batchResponse.getStateMessage());
       }
-      JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList = retrievePkChunkingResultIds(connection, jobId, waitMilliSecond);
-      return jobIdAndBatchIdResultIdList;
+      ResultFileIdsStruct resultFileIdsStruct = retrievePkChunkingResultIds(connection, jobId, waitMilliSecond);
+      return resultFileIdsStruct;
     } catch (Exception e) {
       throw new RuntimeException("getQueryResultIdsPkChunking: error - " + e.getMessage(), e);
     }
@@ -812,10 +795,7 @@ public class SalesforceExtractor extends RestApiExtractor {
    * @return iterator with batch of records
    */
   private List<BatchIdAndResultId> getQueryResultIds(String entity, List<Predicate> predicateList) throws Exception {
-    if (!bulkApiLogin()) {
-      throw new IllegalArgumentException("Invalid Login");
-    }
-
+    bulkApiLogin();
     try {
       // Set bulk job attributes
       this.bulkJob.setObject(entity);
@@ -879,7 +859,6 @@ public class SalesforceExtractor extends RestApiExtractor {
 
       for (BatchInfo bi : batchInfoList.getBatchInfo()) {
         QueryResultList list = this.bulkConnection.getQueryResultList(this.bulkJob.getId(), bi.getId());
-
         for (String result : list.getResult()) {
           batchIdAndResultIdList.add(new BatchIdAndResultId(bi.getId(), result));
         }
@@ -895,176 +874,6 @@ public class SalesforceExtractor extends RestApiExtractor {
     }
   }
 
-
-  /**
-   * Get a buffered reader wrapping the query result stream for the result with the specified index
-   * @param index index the {@link #bulkResultIdList}
-   * @return a {@link BufferedReader}
-   * @throws AsyncApiException
-   */
-  private BufferedReader getBulkBufferedReader(int index) throws AsyncApiException {
-    return new BufferedReader(new InputStreamReader(
-        this.bulkConnection.getQueryResultStream(this.bulkJob.getId(), this.bulkResultIdList.get(index).getBatchId(),
-            this.bulkResultIdList.get(index).getResultId()), ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
-  }
-
-  /**
-   * Fetch records into a {@link RecordSetList} up to the configured batch size {@link #batchSize}. This batch is not
-   * the entire Salesforce result batch. It is an internal batch in the extractor for buffering a subset of the result
-   * stream that comes from a Salesforce batch for more efficient processing.
-   * @param rs the record set to fetch into
-   * @param initialRecordCount Initial record count to use. This should correspond to the number of records already in rs.
-   *                           This is used to limit the number of records returned in rs to {@link #batchSize}.
-   * @throws DataRecordException
-   * @throws IOException
-   */
-  private void fetchResultBatch(RecordSetList<JsonElement> rs, int initialRecordCount)
-      throws DataRecordException, IOException {
-    int recordCount = initialRecordCount;
-
-    // Stream the resultset through CSV reader to identify columns in each record
-    InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);
-
-    // Get header if it is first run of a new resultset
-    if (this.isNewBulkResultSet()) {
-      this.bulkRecordHeader = reader.nextRecord();
-      this.bulkResultColumCount = this.bulkRecordHeader.size();
-      this.setNewBulkResultSet(false);
-    }
-
-    // Get record from CSV reader stream
-    while ((this.csvRecord = reader.nextRecord()) != null) {
-      // Convert CSV record to JsonObject
-      JsonObject jsonObject = Utils.csvToJsonObject(this.bulkRecordHeader, this.csvRecord, this.bulkResultColumCount);
-      rs.add(jsonObject);
-      recordCount++;
-      this.bulkRecordCount++;
-
-      // Insert records in record set until it reaches the batch size
-      if (recordCount >= batchSize) {
-        log.info("Total number of records processed so far: " + this.bulkRecordCount);
-        break;
-      }
-    }
-  }
-
-  /**
-   * Reinitialize the state of {@link #bulkBufferedReader} to handle network disconnects
-   * @throws IOException
-   * @throws AsyncApiException
-   */
-  private void reinitializeBufferedReader() throws IOException, AsyncApiException {
-    // close reader and get a new input stream to reconnect to resolve intermittent network errors
-    this.bulkBufferedReader.close();
-    this.bulkBufferedReader = getBulkBufferedReader(this.bulkResultIdCount - 1);
-
-    // if the result set is partially processed then we need to skip over processed records
-    if (!isNewBulkResultSet()) {
-      List<String> lastCsvRecord = null;
-      InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);
-
-      // skip header
-      reader.nextRecord();
-
-      int recordsToSkip = this.bulkRecordCount - this.prevBulkRecordCount;
-      log.info("Skipping {} records on retry: ", recordsToSkip);
-
-      for (int i = 0; i < recordsToSkip; i++) {
-        lastCsvRecord = reader.nextRecord();
-      }
-
-      // make sure the last record processed before the error was the last record skipped so that the next
-      // unprocessed record is processed in the next call to fetchResultBatch()
-      if (recordsToSkip > 0) {
-        if (!this.csvRecord.equals(lastCsvRecord)) {
-          throw new RuntimeException("Repositioning after reconnecting did not point to the expected record");
-        }
-      }
-    }
-  }
-
-
-
-  /**
-   * Fetch a result batch with retry for network errors
-   * @param rs the {@link RecordSetList} to fetch into
-   */
-  private void fetchResultBatchWithRetry(RecordSetList<JsonElement> rs)
-      throws AsyncApiException, DataRecordException, IOException {
-    boolean success = false;
-    int retryCount = 0;
-    int recordCountBeforeFetch = this.bulkRecordCount;
-
-    do {
-      try {
-        // reinitialize the reader to establish a new connection to handle transient network errors
-        if (retryCount > 0) {
-          reinitializeBufferedReader();
-        }
-
-        // on retries there may already be records in rs, so pass the number of records as the initial count
-        fetchResultBatch(rs, this.bulkRecordCount - recordCountBeforeFetch);
-        success = true;
-      } catch (IOException e) {
-        if (retryCount < this.fetchRetryLimit) {
-          log.info("Exception while fetching data, retrying: " + e.getMessage(), e);
-          retryCount++;
-        } else {
-          log.error("Exception while fetching data: " + e.getMessage(), e);
-          throw e;
-        }
-      }
-    } while (!success);
-  }
-
-  /**
-   * Get data from the bulk api input stream
-   * @return record set with each record as a JsonObject
-   */
-  private RecordSet<JsonElement> getBulkData() throws DataRecordException {
-    log.debug("Processing bulk api batch...");
-    RecordSetList<JsonElement> rs = new RecordSetList<>();
-
-    try {
-      // if Buffer is empty then get stream for the new resultset id
-      if (this.bulkBufferedReader == null || !this.bulkBufferedReader.ready()) {
-
-        // log the number of records from each result set after it is processed (bulkResultIdCount > 0)
-        if (this.bulkResultIdCount > 0) {
-          log.info("Result set {} had {} records", this.bulkResultIdCount,
-              this.bulkRecordCount - this.prevBulkRecordCount);
-        }
-
-        // if there is unprocessed resultset id then get result stream for that id
-        if (this.bulkResultIdCount < this.bulkResultIdList.size()) {
-          log.info("Stream resultset for resultId:" + this.bulkResultIdList.get(this.bulkResultIdCount));
-          this.setNewBulkResultSet(true);
-
-          if (this.bulkBufferedReader != null) {
-            this.bulkBufferedReader.close();
-          }
-
-          this.bulkBufferedReader = getBulkBufferedReader(this.bulkResultIdCount);
-          this.bulkResultIdCount++;
-          this.prevBulkRecordCount = bulkRecordCount;
-        } else {
-          // if result stream processed for all resultset ids then finish the bulk job
-          log.info("Bulk job is finished");
-          this.setBulkJobFinished(true);
-          return rs;
-        }
-      }
-
-      // fetch a batch of results with retry for network errors
-      fetchResultBatchWithRetry(rs);
-
-    } catch (Exception e) {
-      throw new DataRecordException("Failed to get records from salesforce; error - " + e.getMessage(), e);
-    }
-
-    return rs;
-  }
-
   @Override
   public void closeConnection() throws Exception {
     if (this.bulkConnection != null
@@ -1078,12 +887,11 @@ public class SalesforceExtractor extends RestApiExtractor {
     return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), RestApiCommandType.GET));
   }
 
-
-  private JobIdAndBatchIdResultIdList retrievePkChunkingResultIdsByBatchId(BulkConnection connection, String jobId, String batchIdListStr) {
+  private ResultFileIdsStruct retrievePkChunkingResultIdsByBatchId(BulkConnection connection, String jobId, String batchIdListStr) {
     Iterator<String> batchIds = Arrays.stream(batchIdListStr.split(",")).map(x -> x.trim()).filter(x -> !x.equals("")).iterator();
     try {
       List<BatchIdAndResultId> batchIdAndResultIdList = fetchBatchResultIds(connection, jobId, batchIds);
-      return new JobIdAndBatchIdResultIdList(jobId, batchIdAndResultIdList);
+      return new ResultFileIdsStruct(jobId, batchIdAndResultIdList);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -1092,7 +900,7 @@ public class SalesforceExtractor extends RestApiExtractor {
   /**
    * Waits for the PK batches to complete. The wait will stop after all batches are complete or on the first failed batch
    */
-  private JobIdAndBatchIdResultIdList retrievePkChunkingResultIds(BulkConnection connection, String jobId, int waitMilliSecond) {
+  private ResultFileIdsStruct retrievePkChunkingResultIds(BulkConnection connection, String jobId, int waitMilliSecond) {
     log.info("Waiting for completion of the the bulk job [jobId={}])'s sub queries.", jobId);
     try {
       while (true) {
@@ -1107,7 +915,7 @@ public class SalesforceExtractor extends RestApiExtractor {
         Stream<BatchInfo> stream = Arrays.stream(batchInfos);
         Iterator<String> batchIds = stream.filter(x -> x.getNumberRecordsProcessed() != 0).map(x -> x.getId()).iterator();
         List<BatchIdAndResultId> batchIdAndResultIdList = fetchBatchResultIds(connection, jobId, batchIds);
-        return new JobIdAndBatchIdResultIdList(jobId, batchIdAndResultIdList);
+        return new ResultFileIdsStruct(jobId, batchIdAndResultIdList);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -1177,7 +985,7 @@ public class SalesforceExtractor extends RestApiExtractor {
   }
 
   @Data
-  public static class JobIdAndBatchIdResultIdList {
+  public static class ResultFileIdsStruct {
     private final String jobId;
     private final List<BatchIdAndResultId> batchIdAndResultIdList;
   }
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index 96ad4b7..637df52 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -79,7 +79,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import static org.apache.gobblin.configuration.ConfigurationKeys.*;
 import static org.apache.gobblin.salesforce.SalesforceConfigurationKeys.*;
-import org.apache.gobblin.salesforce.SalesforceExtractor.JobIdAndBatchIdResultIdList;
+
 import org.apache.gobblin.salesforce.SalesforceExtractor.BatchIdAndResultId;
 
 /**
@@ -171,11 +171,11 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
    * generate workUnit for pk chunking
    */
   private List<WorkUnit> generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) {
-    JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList = executeQueryWithPkChunking(state, previousWatermark);
-    return createWorkUnits(sourceEntity, state, jobIdAndBatchIdResultIdList);
+    SalesforceExtractor.ResultFileIdsStruct resultFileIdsStruct = executeQueryWithPkChunking(state, previousWatermark);
+    return createWorkUnits(sourceEntity, state, resultFileIdsStruct);
   }
 
-  private JobIdAndBatchIdResultIdList executeQueryWithPkChunking(
+  private SalesforceExtractor.ResultFileIdsStruct executeQueryWithPkChunking(
       SourceState sourceState,
       long previousWatermark
   ) throws RuntimeException {
@@ -204,10 +204,10 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
       List<Predicate> predicateList = Arrays.asList(predicate);
       String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY);
 
-      if (state.contains(PK_CHUNKING_TEST_JOB_ID)) {
-        String jobId = state.getProp(PK_CHUNKING_TEST_JOB_ID, "");
+      if (state.contains(BULK_TEST_JOB_ID)) {
+        String jobId = state.getProp(BULK_TEST_JOB_ID, "");
         log.info("---Skip query, fetching result files directly for [jobId={}]", jobId);
-        String batchIdListStr = state.getProp(PK_CHUNKING_TEST_BATCH_ID_LIST);
+        String batchIdListStr = state.getProp(BULK_TEST_BATCH_ID_LIST);
         return salesforceExtractor.getQueryResultIdsPkChunkingFetchOnly(jobId, batchIdListStr);
       } else {
         log.info("---Pk Chunking query submit.");
@@ -226,7 +226,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
   private List<WorkUnit> createWorkUnits(
       SourceEntity sourceEntity,
       SourceState state,
-      JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList
+      SalesforceExtractor.ResultFileIdsStruct resultFileIdsStruct
   ) {
     String nameSpaceName = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY);
     Extract.TableType tableType = Extract.TableType.valueOf(state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY).toUpperCase());
@@ -235,7 +235,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
 
     List<WorkUnit> workUnits = Lists.newArrayList();
     int partitionNumber = state.getPropAsInt(SOURCE_MAX_NUMBER_OF_PARTITIONS, 1);
-    List<BatchIdAndResultId> batchResultIds = jobIdAndBatchIdResultIdList.getBatchIdAndResultIdList();
+    List<BatchIdAndResultId> batchResultIds = resultFileIdsStruct.getBatchIdAndResultIdList();
     int total = batchResultIds.size();
 
     // size of every partition should be: math.ceil(total/partitionNumber), use simpler way: (total+partitionNumber-1)/partitionNumber
@@ -245,10 +245,10 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
 
     for (List<BatchIdAndResultId> resultIds : partitionedResultIds) {
       WorkUnit workunit = new WorkUnit(extract);
-      String bulkJobId = jobIdAndBatchIdResultIdList.getJobId();
+      String bulkJobId = resultFileIdsStruct.getJobId();
       workunit.setProp(PK_CHUNKING_JOB_ID, bulkJobId);
       String resultIdStr = resultIds.stream().map(x -> x.getBatchId() + ":" + x.getResultId()).collect(Collectors.joining(","));
-      workunit.setProp(PK_CHUNKING_BATCH_RESULT_IDS, resultIdStr);
+      workunit.setProp(PK_CHUNKING_BATCH_RESULT_ID_PAIRS, resultIdStr);
       workunit.setProp(ConfigurationKeys.SOURCE_ENTITY, sourceEntity.getSourceEntityName());
       workunit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, sourceEntity.getDestTableName());
       workunit.setProp(WORK_UNIT_STATE_VERSION_KEY, CURRENT_WORK_UNIT_STATE_VERSION);
@@ -771,4 +771,3 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
     private int probeCount = 0;
   }
 }
-