You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Rohit Pant <rp...@gmail.com> on 2022/05/21 14:45:12 UTC

Problem with implementing the Datasource V2 API for Salesforce

Hi all,

I am trying to implement a custom Spark Datasource for Salesforce by
implementing the Spark Datasource V2 interfaces. For querying Salesforce
data parallelly I am planning to use the Salesforce Bulk V1 API
https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/api_asynch_introduction_bulk_api.htm
.

I am aware that there is an existing Salesforce Spark library by Springml,
but it doesn't support the authentication types I need and isn't compatible
with Spark 3.x.

I am not using the Bulk V2 Salesforce API as it is serial in nature. You
submit a query, Salesforce automatically creates the batches, you then need
to poll for results and iterate over the batches using batch locators
returned in the header.

I am planning to make it work like this -


   1. The user specifies the options numPartitions and dbtable. Using this,
   internally I will fetch the record counts for that Salesforce object and
   deduce the chunk size to be used for querying the data using PK chunking.
   https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_code_curl_walkthrough_pk_chunking.htm
   2. The Bulk API is asynchronous. You need to create a Job and then
   create query batches within the job. If you specify that you want to use PK
   Chunking along with the chunk size you want then Salesforce automatically
   creates the batches for you. You then need to poll for and fetch the
   results for each batch using a batch-specific URL.
   3. I am planning to pass the batch ID to each executor using an
   InputPartition object. Each executor will then poll for and fetch the
   results.

I am having trouble deciding how I would go about creating the Bulk job and
submitting the batches on the driver node before dispatching the batch ids
to the executors. I tried doing this in the implementation of the
planInputPartitions method for the Batch interface, but it turns out that
it is called 2-3 times per each action(show, collect etc.), thus creating
unnecessary Bulk jobs.

One potential solution that might work is maintaining a set of hashed user
options in the static scope for the implementation of the Batch interface
(using a companion object) and only creating the job if it doesn't exist in
the set. However, I find this solution to be very clumsy. Also, what
happens if a user submits multiple actions on a dataframe. I could maybe
also have a TTL for the set entries, but you see how it gets complicated.

Would really appreciate any pointers on the ideal way to achieve what I
want.

Regards,

Rohit Pant

Re: Problem with implementing the Datasource V2 API for Salesforce

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

in the spirit of not fitting the solution to the problem, would it not be
better to first create a producer for your job and use a broker like Kafka
or Kinesis or Pulsar?


Regards,
Gourav Sengupta

On Sat, May 21, 2022 at 3:46 PM Rohit Pant <rp...@gmail.com> wrote:

> Hi all,
>
> I am trying to implement a custom Spark Datasource for Salesforce by
> implementing the Spark Datasource V2 interfaces. For querying Salesforce
> data parallelly I am planning to use the Salesforce Bulk V1 API
> https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/api_asynch_introduction_bulk_api.htm
> .
>
> I am aware that there is an existing Salesforce Spark library by Springml,
> but it doesn't support the authentication types I need and isn't compatible
> with Spark 3.x.
>
> I am not using the Bulk V2 Salesforce API as it is serial in nature. You
> submit a query, Salesforce automatically creates the batches, you then need
> to poll for results and iterate over the batches using batch locators
> returned in the header.
>
> I am planning to make it work like this -
>
>
>    1. The user specifies the options numPartitions and dbtable. Using
>    this, internally I will fetch the record counts for that Salesforce object
>    and deduce the chunk size to be used for querying the data using PK
>    chunking.
>    https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_code_curl_walkthrough_pk_chunking.htm
>    2. The Bulk API is asynchronous. You need to create a Job and then
>    create query batches within the job. If you specify that you want to use PK
>    Chunking along with the chunk size you want then Salesforce automatically
>    creates the batches for you. You then need to poll for and fetch the
>    results for each batch using a batch-specific URL.
>    3. I am planning to pass the batch ID to each executor using an
>    InputPartition object. Each executor will then poll for and fetch the
>    results.
>
> I am having trouble deciding how I would go about creating the Bulk job
> and submitting the batches on the driver node before dispatching the batch
> ids to the executors. I tried doing this in the implementation of the
> planInputPartitions method for the Batch interface, but it turns out that
> it is called 2-3 times per each action(show, collect etc.), thus creating
> unnecessary Bulk jobs.
>
> One potential solution that might work is maintaining a set of hashed user
> options in the static scope for the implementation of the Batch interface
> (using a companion object) and only creating the job if it doesn't exist in
> the set. However, I find this solution to be very clumsy. Also, what
> happens if a user submits multiple actions on a dataframe. I could maybe
> also have a TTL for the set entries, but you see how it gets complicated.
>
> Would really appreciate any pointers on the ideal way to achieve what I
> want.
>
> Regards,
>
> Rohit Pant
>