You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Rohit Pant <rp...@gmail.com> on 2022/05/21 14:45:12 UTC
Problem with implementing the Datasource V2 API for Salesforce
Hi all,
I am trying to implement a custom Spark Datasource for Salesforce by
implementing the Spark Datasource V2 interfaces. For querying Salesforce
data parallelly I am planning to use the Salesforce Bulk V1 API
https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/api_asynch_introduction_bulk_api.htm
.
I am aware that there is an existing Salesforce Spark library by Springml,
but it doesn't support the authentication types I need and isn't compatible
with Spark 3.x.
I am not using the Bulk V2 Salesforce API as it is serial in nature. You
submit a query, Salesforce automatically creates the batches, you then need
to poll for results and iterate over the batches using batch locators
returned in the header.
I am planning to make it work like this -
1. The user specifies the options numPartitions and dbtable. Using this,
internally I will fetch the record counts for that Salesforce object and
deduce the chunk size to be used for querying the data using PK chunking.
https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_code_curl_walkthrough_pk_chunking.htm
2. The Bulk API is asynchronous. You need to create a Job and then
create query batches within the job. If you specify that you want to use PK
Chunking along with the chunk size you want then Salesforce automatically
creates the batches for you. You then need to poll for and fetch the
results for each batch using a batch-specific URL.
3. I am planning to pass the batch ID to each executor using an
InputPartition object. Each executor will then poll for and fetch the
results.
I am having trouble deciding how I would go about creating the Bulk job and
submitting the batches on the driver node before dispatching the batch ids
to the executors. I tried doing this in the implementation of the
planInputPartitions method for the Batch interface, but it turns out that
it is called 2-3 times per each action(show, collect etc.), thus creating
unnecessary Bulk jobs.
One potential solution that might work is maintaining a set of hashed user
options in the static scope for the implementation of the Batch interface
(using a companion object) and only creating the job if it doesn't exist in
the set. However, I find this solution to be very clumsy. Also, what
happens if a user submits multiple actions on a dataframe. I could maybe
also have a TTL for the set entries, but you see how it gets complicated.
Would really appreciate any pointers on the ideal way to achieve what I
want.
Regards,
Rohit Pant
Re: Problem with implementing the Datasource V2 API for Salesforce
Posted by Gourav Sengupta <go...@gmail.com>.
Hi,
in the spirit of not fitting the solution to the problem, would it not be
better to first create a producer for your job and use a broker like Kafka
or Kinesis or Pulsar?
Regards,
Gourav Sengupta
On Sat, May 21, 2022 at 3:46 PM Rohit Pant <rp...@gmail.com> wrote:
> Hi all,
>
> I am trying to implement a custom Spark Datasource for Salesforce by
> implementing the Spark Datasource V2 interfaces. For querying Salesforce
> data parallelly I am planning to use the Salesforce Bulk V1 API
> https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/api_asynch_introduction_bulk_api.htm
> .
>
> I am aware that there is an existing Salesforce Spark library by Springml,
> but it doesn't support the authentication types I need and isn't compatible
> with Spark 3.x.
>
> I am not using the Bulk V2 Salesforce API as it is serial in nature. You
> submit a query, Salesforce automatically creates the batches, you then need
> to poll for results and iterate over the batches using batch locators
> returned in the header.
>
> I am planning to make it work like this -
>
>
> 1. The user specifies the options numPartitions and dbtable. Using
> this, internally I will fetch the record counts for that Salesforce object
> and deduce the chunk size to be used for querying the data using PK
> chunking.
> https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_code_curl_walkthrough_pk_chunking.htm
> 2. The Bulk API is asynchronous. You need to create a Job and then
> create query batches within the job. If you specify that you want to use PK
> Chunking along with the chunk size you want then Salesforce automatically
> creates the batches for you. You then need to poll for and fetch the
> results for each batch using a batch-specific URL.
> 3. I am planning to pass the batch ID to each executor using an
> InputPartition object. Each executor will then poll for and fetch the
> results.
>
> I am having trouble deciding how I would go about creating the Bulk job
> and submitting the batches on the driver node before dispatching the batch
> ids to the executors. I tried doing this in the implementation of the
> planInputPartitions method for the Batch interface, but it turns out that
> it is called 2-3 times per each action(show, collect etc.), thus creating
> unnecessary Bulk jobs.
>
> One potential solution that might work is maintaining a set of hashed user
> options in the static scope for the implementation of the Batch interface
> (using a companion object) and only creating the job if it doesn't exist in
> the set. However, I find this solution to be very clumsy. Also, what
> happens if a user submits multiple actions on a dataframe. I could maybe
> also have a TTL for the set entries, but you see how it gets complicated.
>
> Would really appreciate any pointers on the ideal way to achieve what I
> want.
>
> Regards,
>
> Rohit Pant
>