You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Atlas - Samir Souidi <SS...@atlasdataservices.com> on 2021/11/25 07:09:18 UTC
Spark salesforce connector
Dear all,
Do you know if there is any spark connector to SalesForce?
Thanks
Sam
Sent from Mail<https://go.microsoft.com/fwlink/?LinkId=550986> for Windows
Re: Spark salesforce connector
Posted by daniel queiroz <da...@gmail.com>.
Hi,
https://github.com/springml/spark-salesforce.git
I've customized this code that I've found, maybe it can help you.
import java.io._
import com.typesafe.config.{ Config, ConfigFactory }
import scalaj.http.{Http, HttpOptions, HttpResponse}
import org.apache.spark.sql.{Dataset, SparkSession}
import spark.implicits._
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS}
class SalesforceBulk {
lazy val domain = dbutils.secrets.get(scope = "sds-keyvault-scope", key =
"sf-domain")
lazy val userId = dbutils.secrets.get(scope = "sds-keyvault-scope", key =
"sf-userId")
lazy val tokenURL = dbutils.secrets.get(scope = "sds-keyvault-scope", key
= "sf-tokenURL")
lazy val sourceURL = dbutils.secrets.get(scope = "sds-keyvault-scope",
key = "sf-sourceURL")
lazy val passphrase = dbutils.secrets.get(scope = "sds-keyvault-scope",
key = "sf-passphrase")
lazy val private_key = dbutils.secrets.get(scope = "sds-keyvault-scope",
key = "sf-private-key")
lazy val consumer_key = dbutils.secrets.get(scope = "sds-keyvault-scope",
key = "sf-consumer-key")
/*
* query
*/
lazy val queryConnectedApplications = "SELECT MobileSessionTimeout,
MobileStartUrl, Name, OptionsAllowAdminApprovedUsersOnly,
OptionsFullContentPushNotifications, OptionsHasSessionLevelPolicy,
OptionsIsInternal, OptionsRefreshTokenValidityMetric, PinLength,
RefreshTokenValidityPeriod, StartUrl FROM ConnectedApplication"
lazy val queryIDPEventLogs = "SELECT AppId, AuthSessionId, ErrorCode,
IdentityUsed, InitiatedBy, OptionsHasLogoutUrl, SamlEntityUrl, SsoType,
Timestamp, UserId FROM IdpEventLog WHERE Timestamp >= ts_begin AND
Timestamp < ts_end"
lazy val queryLoginHistory = "SELECT Id, ApiType, ApiVersion,
Application, AuthenticationServiceId, Browser, CipherSuite, ClientVersion,
CountryIso, LoginGeoId, LoginTime, LoginType, LoginUrl, OptionsIsGet,
OptionsIsPost, Platform, SourceIp, Status, TlsProtocol, UserId FROM
LoginHistory WHERE LoginTime >= ts_begin AND LoginTime < ts_end"
lazy val queryREIDP_Application__c = "SELECT Id, OwnerId, IsDeleted,
Name, CurrencyIsoCode, CreatedDate, CreatedById, LastModifiedDate,
LastModifiedById, SystemModstamp, LastViewedDate, LastReferencedDate,
Approver__c, Can_User_Request_Access__c, Community__c, Connected_App__c,
Description__c, Is_Active__c, Is_Restricted__c, Permission_Set__c,
Short_Name__c, URL__c, Latest_Privacy_Policy_Version__c,
Latest_Terms_Conditions_Version__c FROM REIDP_Application__c WHERE
SystemModstamp >= ts_begin AND SystemModstamp < ts_end"
lazy val queryREIDP_Request_Log__b = "SELECT Timestamp__c, CreatedById,
CreatedDate, SystemModstamp, Client_IP__c, Event_Type__c,
Execution_Time__c, Media_Type__c, Method__c, Request_Body__c,
Request_Headers__c, Request_Id__c, Request_Params__c, Request_Status__c,
Response_Body__c, Response_Status_Code__c, URI__c, User_Agent__c,
User_Id__c FROM REIDP_Request_Log__b WHERE Timestamp__c >= ts_begin AND
Timestamp__c < ts_end"
lazy val queryREIDP_User_Consent__c = "SELECT Id, OwnerId, IsDeleted,
Name, CurrencyIsoCode, CreatedDate, CreatedById, LastModifiedDate,
LastModifiedById, SystemModstamp, Application__c, Accepted_Time__c,
Accepted__c, Application_Short_Name__c, Channel__c, Contact__c, Type__c,
User__c, Version__c FROM REIDP_User_Consent__c WHERE SystemModstamp >=
ts_begin AND SystemModstamp < ts_end"
lazy val querySetupAuditTrail = "SELECT CreatedDate, Action,
CreatedByContext, CreatedByIssuer, DelegateUser, Display, Section FROM
SetupAuditTrail WHERE CreatedDate >= ts_begin AND CreatedDate < ts_end"
lazy val queryUser = "SELECT Id, LastLoginDate, lastPasswordChangeDate,
CreatedDate, LastModifiedDate, REIDP_Did_Finish_Account_Setup__c,
REIDP_Social_Accounts__c FROM User WHERE LastModifiedDate >= ts_begin AND
LastModifiedDate < ts_end"
lazy val queryVerificationHistory = "SELECT Activity, EventGroup,
LoginGeoId, LoginHistoryId, Policy, Remarks, ResourceId, SourceIp, Status,
UserId, VerificationMethod, VerificationTime FROM VerificationHistory WHERE
VerificationTime >= ts_begin AND VerificationTime < ts_end"
lazy val queryLoginGeo = "SELECT Id, Country, CountryISO, Latitude,
Longitude, City, Subdivision, PostalCode, loginTime FROM LoginGeo WHERE
LoginTime >= ts_begin AND LoginTime < ts_end"
/*
* request
*/
val header = """{"alg":"RS256"}"""
val encodedHeader =
Base64.encodeBase64URLSafeString(header.getBytes("utf-8"))
val expiretime = ((System.currentTimeMillis()/1000) + 300).toString
val payload = """{"iss":"""" + consumer_key + """","aud":"""" + domain +
"""","sub":"""" + userId + """","exp":"""" + expiretime + """"}"""
val encodedpayload =
Base64.encodeBase64URLSafeString(payload.getBytes("utf-8"))
val message = encodedHeader + "." + encodedpayload
/*
* assertion
*/
val grant_type = "urn:ietf:params:oauth:grant-type:jwt-bearer"
val result = encriptParam(message, readPrivateKey(private_key))
val assertion = message + "." + result
/*
* access token
*/
def retrieveAccessToken(): String = {
val tokenresponse = Http(tokenURL).timeout(connTimeoutMs = 20000,
readTimeoutMs = 600000)
.param("grant_type", grant_type)
.param("assertion", assertion)
.header("Content-Type", "application/x-www-form-urlencoded")
.header("Charset", "UTF-8").asString
val tokends = spark.read.json(Seq(tokenresponse.body).toDS())
val token = tokends.select("access_token").first().mkString("")
token
}
/*
* create query job
*/
def createQueryJob(selectQuery: String, token: String): String = {
val createjobResp = Http(sourceURL).timeout(connTimeoutMs = 20000,
readTimeoutMs = 600000)
.postData(s"""{"columnDelimiter": "PIPE", "operation": "query",
"query": "${selectQuery}"}""")
.header("Authorization", "Bearer " + token)
.header("Content-Type", "application/json")
.header("Charset", "UTF-8").asString
val job = spark.read.json(Seq(createjobResp.body).toDS())
val jobId = job.select("id").first().mkString("")
jobId
}
/*
* returns true if query job is completed
*/
def jobIsCompleted(jobId: String, token: String): Boolean = {
val jobStatus = Http(sourceURL + "/" + jobId).timeout(connTimeoutMs =
20000, readTimeoutMs = 600000)
.header("Authorization", "Bearer " + token)
.header("Content-Type", "application/json")
.header("Charset", "UTF-8").asString
val statusresponseBody = spark.read.json(Seq(jobStatus.body).toDS())
val status = statusresponseBody.select("state").first().mkString("")
status match {
case "Completed" => {
true
}
case "JobComplete" => {
true
}
case "CompletedWithWarnings" => {
true
}
case "Failed" => {
false
}
case _ => {
Thread.sleep(2000)
jobIsCompleted(jobId, token)
}
}
}
/*
* get query job status (recursively)
*/
private def retryWithExponentialBackoff(
func:() => Boolean,
timeoutDuration: FiniteDuration,
initSleepInterval: FiniteDuration,
maxSleepInterval: FiniteDuration): Boolean = {
val timeout = timeoutDuration.toMillis
var waited = 0L
var sleepInterval = initSleepInterval.toMillis
var done = false
do {
done = func()
if (!done) {
sleepInterval = math.min(sleepInterval * 2,
maxSleepInterval.toMillis)
var sleepTime = math.min(sleepInterval, timeout - waited)
if (sleepTime < 1L) {
sleepTime = 1
}
Thread.sleep(sleepTime)
waited += sleepTime
}
} while (!done && waited < timeout)
done
}
/*
* wait until the job is completed
*/
def awaitJobCompleted(jobId: String, token: String): Boolean = {
val timeoutDuration = FiniteDuration(600000L, MILLISECONDS)
val initSleepIntervalDuration = FiniteDuration(200L, MILLISECONDS)
val maxSleepIntervalDuration = FiniteDuration(10000L, MILLISECONDS)
var completed = false
retryWithExponentialBackoff(() => {
completed = jobIsCompleted(jobId, token)
completed
}, timeoutDuration, initSleepIntervalDuration, maxSleepIntervalDuration)
return completed
}
/*
* retrieve query data
*/
def retrieveQueryResults(jobId: String, token: String): DataFrame = {
val result = Http(sourceURL + "/" + jobId +
"/results?maxRecords=500000").timeout(connTimeoutMs = 20000, readTimeoutMs
= 600000)
.header("Authorization", "Bearer " + token)
.asString
val resultBody = result.body
val csvData: Dataset[String] =
spark.sparkContext.parallelize(resultBody.stripMargin.lines.toList).toDS()
val dsResult = spark.read.option("header",
true).option("inferSchema",true).option("sep", "|").csv(csvData)
dsResult
}
}
Regards,
Daniel Queiroz
+55 81 996289671
Em qui., 25 de nov. de 2021 às 04:09, Atlas - Samir Souidi <
SSouidi@atlasdataservices.com> escreveu:
> Dear all,
>
> Do you know if there is any spark connector to SalesForce?
>
> Thanks
>
> Sam
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>