You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Atlas - Samir Souidi <SS...@atlasdataservices.com> on 2021/11/25 07:09:18 UTC

Spark salesforce connector

Dear all,
Do you know if there is any spark connector to SalesForce?
Thanks
Sam

Sent from Mail<https://go.microsoft.com/fwlink/?LinkId=550986> for Windows


Re: Spark salesforce connector

Posted by daniel queiroz <da...@gmail.com>.
Hi,

https://github.com/springml/spark-salesforce.git

I've customized this code that I've found, maybe it can help you.

import java.io._
import com.typesafe.config.{ Config, ConfigFactory }
import scalaj.http.{Http, HttpOptions, HttpResponse}
import org.apache.spark.sql.{Dataset, SparkSession}
import spark.implicits._
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS}

class SalesforceBulk {

  lazy val domain = dbutils.secrets.get(scope = "sds-keyvault-scope", key =
"sf-domain")
  lazy val userId = dbutils.secrets.get(scope = "sds-keyvault-scope", key =
"sf-userId")
  lazy val tokenURL = dbutils.secrets.get(scope = "sds-keyvault-scope", key
= "sf-tokenURL")
  lazy val sourceURL = dbutils.secrets.get(scope = "sds-keyvault-scope",
key = "sf-sourceURL")
  lazy val passphrase = dbutils.secrets.get(scope = "sds-keyvault-scope",
key = "sf-passphrase")
  lazy val private_key = dbutils.secrets.get(scope = "sds-keyvault-scope",
key = "sf-private-key")
  lazy val consumer_key = dbutils.secrets.get(scope = "sds-keyvault-scope",
key = "sf-consumer-key")


  /*
  *  query
  */


  lazy val queryConnectedApplications = "SELECT MobileSessionTimeout,
MobileStartUrl, Name, OptionsAllowAdminApprovedUsersOnly,
OptionsFullContentPushNotifications, OptionsHasSessionLevelPolicy,
OptionsIsInternal, OptionsRefreshTokenValidityMetric, PinLength,
RefreshTokenValidityPeriod, StartUrl FROM ConnectedApplication"

  lazy val queryIDPEventLogs = "SELECT AppId, AuthSessionId, ErrorCode,
IdentityUsed, InitiatedBy, OptionsHasLogoutUrl, SamlEntityUrl, SsoType,
Timestamp, UserId FROM IdpEventLog WHERE Timestamp >= ts_begin AND
Timestamp < ts_end"

  lazy val queryLoginHistory = "SELECT Id, ApiType, ApiVersion,
Application, AuthenticationServiceId, Browser, CipherSuite, ClientVersion,
CountryIso, LoginGeoId, LoginTime, LoginType, LoginUrl, OptionsIsGet,
OptionsIsPost, Platform, SourceIp, Status, TlsProtocol, UserId FROM
LoginHistory WHERE LoginTime >= ts_begin AND LoginTime < ts_end"

  lazy val queryREIDP_Application__c = "SELECT Id, OwnerId, IsDeleted,
Name, CurrencyIsoCode, CreatedDate, CreatedById, LastModifiedDate,
LastModifiedById, SystemModstamp, LastViewedDate, LastReferencedDate,
Approver__c, Can_User_Request_Access__c, Community__c, Connected_App__c,
Description__c, Is_Active__c, Is_Restricted__c, Permission_Set__c,
Short_Name__c, URL__c, Latest_Privacy_Policy_Version__c,
Latest_Terms_Conditions_Version__c FROM REIDP_Application__c WHERE
SystemModstamp >= ts_begin AND SystemModstamp < ts_end"

  lazy val queryREIDP_Request_Log__b = "SELECT Timestamp__c, CreatedById,
CreatedDate, SystemModstamp, Client_IP__c, Event_Type__c,
Execution_Time__c, Media_Type__c, Method__c, Request_Body__c,
Request_Headers__c, Request_Id__c, Request_Params__c, Request_Status__c,
Response_Body__c, Response_Status_Code__c, URI__c, User_Agent__c,
User_Id__c FROM REIDP_Request_Log__b WHERE Timestamp__c >= ts_begin AND
Timestamp__c < ts_end"

  lazy val queryREIDP_User_Consent__c = "SELECT Id, OwnerId, IsDeleted,
Name, CurrencyIsoCode, CreatedDate, CreatedById, LastModifiedDate,
LastModifiedById, SystemModstamp, Application__c, Accepted_Time__c,
Accepted__c, Application_Short_Name__c, Channel__c, Contact__c, Type__c,
User__c, Version__c FROM REIDP_User_Consent__c WHERE SystemModstamp >=
ts_begin AND SystemModstamp < ts_end"

  lazy val querySetupAuditTrail = "SELECT CreatedDate, Action,
CreatedByContext, CreatedByIssuer, DelegateUser, Display, Section FROM
SetupAuditTrail WHERE CreatedDate >= ts_begin AND CreatedDate < ts_end"

  lazy val queryUser = "SELECT Id, LastLoginDate, lastPasswordChangeDate,
CreatedDate, LastModifiedDate, REIDP_Did_Finish_Account_Setup__c,
REIDP_Social_Accounts__c FROM User WHERE LastModifiedDate >= ts_begin AND
LastModifiedDate < ts_end"

  lazy val queryVerificationHistory = "SELECT Activity, EventGroup,
LoginGeoId, LoginHistoryId, Policy, Remarks, ResourceId, SourceIp, Status,
UserId, VerificationMethod, VerificationTime FROM VerificationHistory WHERE
VerificationTime >= ts_begin AND VerificationTime < ts_end"

  lazy val queryLoginGeo = "SELECT Id, Country, CountryISO, Latitude,
Longitude, City, Subdivision, PostalCode, loginTime FROM LoginGeo WHERE
LoginTime >= ts_begin AND LoginTime < ts_end"



  /*
  *  request
  */

  val header = """{"alg":"RS256"}"""
  val encodedHeader =
Base64.encodeBase64URLSafeString(header.getBytes("utf-8"))
  val expiretime = ((System.currentTimeMillis()/1000) + 300).toString
  val payload = """{"iss":"""" + consumer_key + """","aud":"""" + domain +
"""","sub":"""" + userId + """","exp":"""" + expiretime + """"}"""
  val encodedpayload =
Base64.encodeBase64URLSafeString(payload.getBytes("utf-8"))
  val message = encodedHeader + "." + encodedpayload

  /*
  *  assertion
  */

  val grant_type = "urn:ietf:params:oauth:grant-type:jwt-bearer"
  val result = encriptParam(message, readPrivateKey(private_key))
  val assertion = message + "." + result


  /*
  *  access token
  */
  def retrieveAccessToken(): String = {

    val tokenresponse = Http(tokenURL).timeout(connTimeoutMs = 20000,
readTimeoutMs = 600000)
      .param("grant_type", grant_type)
      .param("assertion", assertion)
      .header("Content-Type", "application/x-www-form-urlencoded")
      .header("Charset", "UTF-8").asString

    val tokends = spark.read.json(Seq(tokenresponse.body).toDS())
    val token = tokends.select("access_token").first().mkString("")

    token
  }


  /*
  *  create query job
  */
  def createQueryJob(selectQuery: String, token: String): String = {

    val createjobResp = Http(sourceURL).timeout(connTimeoutMs = 20000,
readTimeoutMs = 600000)
      .postData(s"""{"columnDelimiter": "PIPE", "operation": "query",
"query": "${selectQuery}"}""")
      .header("Authorization", "Bearer " + token)
      .header("Content-Type", "application/json")
      .header("Charset", "UTF-8").asString

    val job = spark.read.json(Seq(createjobResp.body).toDS())
    val jobId = job.select("id").first().mkString("")

    jobId
  }


  /*
  *  returns true if query job is completed
  */
  def jobIsCompleted(jobId: String, token: String): Boolean = {

    val jobStatus = Http(sourceURL + "/" + jobId).timeout(connTimeoutMs =
20000, readTimeoutMs = 600000)
      .header("Authorization", "Bearer " + token)
      .header("Content-Type", "application/json")
      .header("Charset", "UTF-8").asString

    val statusresponseBody = spark.read.json(Seq(jobStatus.body).toDS())
    val status = statusresponseBody.select("state").first().mkString("")

    status match {
        case "Completed" => {
          true
        }
        case "JobComplete" => {
          true
        }
        case "CompletedWithWarnings" => {
          true
        }
        case "Failed" => {
          false
        }
        case _ => {
          Thread.sleep(2000)
          jobIsCompleted(jobId, token)
        }
     }
  }

  /*
  *  get query job status (recursively)
  */
  private def retryWithExponentialBackoff(
      func:() => Boolean,
      timeoutDuration: FiniteDuration,
      initSleepInterval: FiniteDuration,
      maxSleepInterval: FiniteDuration): Boolean = {

    val timeout = timeoutDuration.toMillis
    var waited = 0L
    var sleepInterval = initSleepInterval.toMillis
    var done = false

    do {
      done = func()
      if (!done) {
        sleepInterval = math.min(sleepInterval * 2,
maxSleepInterval.toMillis)

        var sleepTime = math.min(sleepInterval, timeout - waited)
        if (sleepTime < 1L) {
          sleepTime = 1
        }
        Thread.sleep(sleepTime)
        waited += sleepTime
      }
    } while (!done && waited < timeout)

    done
  }

  /*
  *  wait until the job is completed
  */
  def awaitJobCompleted(jobId: String, token: String): Boolean = {

    val timeoutDuration = FiniteDuration(600000L, MILLISECONDS)
    val initSleepIntervalDuration = FiniteDuration(200L, MILLISECONDS)
    val maxSleepIntervalDuration = FiniteDuration(10000L, MILLISECONDS)
    var completed = false

    retryWithExponentialBackoff(() => {
      completed = jobIsCompleted(jobId, token)
      completed
    }, timeoutDuration, initSleepIntervalDuration, maxSleepIntervalDuration)

    return completed
  }


  /*
  *  retrieve query data
  */
  def retrieveQueryResults(jobId: String, token: String): DataFrame = {

    val result = Http(sourceURL + "/"  + jobId +
"/results?maxRecords=500000").timeout(connTimeoutMs = 20000, readTimeoutMs
= 600000)
      .header("Authorization", "Bearer " + token)
      .asString

    val resultBody = result.body
    val csvData: Dataset[String] =
spark.sparkContext.parallelize(resultBody.stripMargin.lines.toList).toDS()
    val dsResult = spark.read.option("header",
true).option("inferSchema",true).option("sep", "|").csv(csvData)
    dsResult
  }
}


Regards,

Daniel Queiroz
+55 81 996289671


Em qui., 25 de nov. de 2021 às 04:09, Atlas - Samir Souidi <
SSouidi@atlasdataservices.com> escreveu:

> Dear all,
>
> Do you know if there is any spark connector to SalesForce?
>
> Thanks
>
> Sam
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>