You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/21 18:50:25 UTC

incubator-gobblin git commit: [GOBBLIN-434] support Salesforce refresh token

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 85ba58d51 -> 1c5fb6ec4


[GOBBLIN-434] support Salesforce refresh token

Closes #2309 from haojiwu/salesforce_refresh_token


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1c5fb6ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1c5fb6ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1c5fb6ec

Branch: refs/heads/master
Commit: 1c5fb6ec48baf00839aeb3e8427c2c9ca3783e0b
Parents: 85ba58d
Author: Haoji Wu <ho...@linkedin.com>
Authored: Wed Mar 21 11:50:17 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Mar 21 11:50:17 2018 -0700

----------------------------------------------------------------------
 .../gobblin/salesforce/SalesforceConnector.java | 57 +++++++++++++++++---
 .../gobblin/salesforce/SalesforceExtractor.java | 32 ++++++++---
 2 files changed, 74 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c5fb6ec/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
----------------------------------------------------------------------
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
index f95e7f7..6ba7965 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
@@ -25,6 +25,7 @@ import org.apache.http.HttpResponse;
 import org.apache.http.NameValuePair;
 import org.apache.http.client.entity.UrlEncodedFormEntity;
 import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.message.BasicNameValuePair;
 
 import com.google.common.collect.Lists;
@@ -46,9 +47,15 @@ public class SalesforceConnector extends RestApiConnector {
 
   private static final String DEFAULT_SERVICES_DATA_PATH = "/services/data";
   private static final String DEFAULT_AUTH_TOKEN_PATH = "/services/oauth2/token";
+  protected String refreshToken;
 
   public SalesforceConnector(State state) {
     super(state);
+    if (isPasswordGrant(state)) {
+      this.refreshToken = null;
+    } else {
+      this.refreshToken = state.getProp(ConfigurationKeys.SOURCE_CONN_REFRESH_TOKEN);
+    }
   }
 
   @Getter
@@ -59,18 +66,26 @@ public class SalesforceConnector extends RestApiConnector {
     log.debug("Authenticating salesforce");
     String clientId = this.state.getProp(ConfigurationKeys.SOURCE_CONN_CLIENT_ID);
     String clientSecret = this.state.getProp(ConfigurationKeys.SOURCE_CONN_CLIENT_SECRET);
-    String userName = this.state.getProp(ConfigurationKeys.SOURCE_CONN_USERNAME);
-    String password = PasswordManager.getInstance(this.state)
-        .readPassword(this.state.getProp(ConfigurationKeys.SOURCE_CONN_PASSWORD));
-    String securityToken = this.state.getProp(ConfigurationKeys.SOURCE_CONN_SECURITY_TOKEN);
     String host = this.state.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
 
     List<NameValuePair> formParams = Lists.newArrayList();
-    formParams.add(new BasicNameValuePair("grant_type", "password"));
     formParams.add(new BasicNameValuePair("client_id", clientId));
     formParams.add(new BasicNameValuePair("client_secret", clientSecret));
-    formParams.add(new BasicNameValuePair("username", userName));
-    formParams.add(new BasicNameValuePair("password", password + securityToken));
+
+    if (refreshToken == null) {
+      log.info("Authenticating salesforce with username/password");
+      String userName = this.state.getProp(ConfigurationKeys.SOURCE_CONN_USERNAME);
+      String password = PasswordManager.getInstance(this.state)
+          .readPassword(this.state.getProp(ConfigurationKeys.SOURCE_CONN_PASSWORD));
+      String securityToken = this.state.getProp(ConfigurationKeys.SOURCE_CONN_SECURITY_TOKEN);
+      formParams.add(new BasicNameValuePair("grant_type", "password"));
+      formParams.add(new BasicNameValuePair("username", userName));
+      formParams.add(new BasicNameValuePair("password", password + securityToken));
+    } else {
+      log.info("Authenticating salesforce with refresh_token");
+      formParams.add(new BasicNameValuePair("grant_type", "refresh_token"));
+      formParams.add(new BasicNameValuePair("refresh_token", refreshToken));
+    }
     try {
       HttpPost post = new HttpPost(host + DEFAULT_AUTH_TOKEN_PATH);
       post.setEntity(new UrlEncodedFormEntity(formParams));
@@ -80,11 +95,29 @@ public class SalesforceConnector extends RestApiConnector {
 
       return httpEntity;
     } catch (Exception e) {
-      throw new RestApiConnectionException("Failed to authenticate salesforce using user:" + userName + " and host:"
+      throw new RestApiConnectionException("Failed to authenticate salesforce host:"
           + host + "; error-" + e.getMessage(), e);
     }
   }
 
+  @Override
+  protected void addHeaders(HttpRequestBase httpRequest) {
+    if (refreshToken == null) {
+      super.addHeaders(httpRequest);
+    } else {
+      if (this.accessToken != null) {
+        httpRequest.addHeader("Authorization", "Bearer " + this.accessToken);
+      }
+      httpRequest.addHeader("Content-Type", "application/json");
+    }
+  }
+
+  static boolean isPasswordGrant(State state) {
+    String userName = state.getProp(ConfigurationKeys.SOURCE_CONN_USERNAME);
+    String securityToken = state.getProp(ConfigurationKeys.SOURCE_CONN_SECURITY_TOKEN);
+    return (userName != null &&  securityToken != null);
+  }
+
   private String getServiceBaseUrl() {
     String dataEnvPath = DEFAULT_SERVICES_DATA_PATH + "/v" + this.state.getProp(ConfigurationKeys.SOURCE_CONN_VERSION);
     this.servicesDataEnvPath = dataEnvPath;
@@ -94,4 +127,12 @@ public class SalesforceConnector extends RestApiConnector {
   public String getFullUri(String resourcePath) {
     return StringUtils.removeEnd(getServiceBaseUrl(), "/") + StringUtils.removeEnd(resourcePath, "/");
   }
+
+  String getAccessToken() {
+    return accessToken;
+  }
+
+  String getInstanceUrl() {
+    return instanceUrl;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c5fb6ec/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 0c16051..4442214 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 
+import org.apache.gobblin.salesforce.SalesforceConfigurationKeys;
 import org.apache.http.HttpEntity;
 import org.apache.http.NameValuePair;
 import org.apache.http.client.methods.HttpGet;
@@ -98,7 +99,7 @@ public class SalesforceExtractor extends RestApiExtractor {
   public static final String SALESFORCE_TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'.000Z'";
   private static final String SALESFORCE_DATE_FORMAT = "yyyy-MM-dd";
   private static final String SALESFORCE_HOUR_FORMAT = "HH";
-  private static final String SALESFORCE_SOAP_AUTH_SERVICE = "/services/Soap/u";
+  private static final String SALESFORCE_SOAP_SERVICE = "/services/Soap/u";
   private static final Gson GSON = new Gson();
   private static final int MAX_PK_CHUNKING_SIZE = 250000;
   private static final int MIN_PK_CHUNKING_SIZE = 100000;
@@ -641,7 +642,7 @@ public class SalesforceExtractor extends RestApiExtractor {
       apiVersion = "29.0";
     }
 
-    String soapAuthEndPoint = hostName + SALESFORCE_SOAP_AUTH_SERVICE + "/" + apiVersion;
+    String soapAuthEndPoint = hostName + SALESFORCE_SOAP_SERVICE + "/" + apiVersion;
     try {
       ConnectorConfig partnerConfig = new ConnectorConfig();
       if (super.workUnitState.contains(ConfigurationKeys.SOURCE_CONN_USE_PROXY_URL)
@@ -650,12 +651,29 @@ public class SalesforceExtractor extends RestApiExtractor {
             super.workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_CONN_USE_PROXY_PORT));
       }
 
-      String securityToken = this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_SECURITY_TOKEN);
-      String password = PasswordManager.getInstance(this.workUnitState)
-          .readPassword(this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_PASSWORD));
-      partnerConfig.setUsername(this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_USERNAME));
-      partnerConfig.setPassword(password + securityToken);
+      String accessToken = sfConnector.getAccessToken();
+
+      if (accessToken == null) {
+        boolean isConnectSuccess = sfConnector.connect();
+        if (isConnectSuccess) {
+          accessToken = sfConnector.getAccessToken();
+        }
+      }
+
+      if (accessToken != null) {
+        String serviceEndpoint = sfConnector.getInstanceUrl() + SALESFORCE_SOAP_SERVICE + "/" + apiVersion;
+        partnerConfig.setSessionId(accessToken);
+        partnerConfig.setServiceEndpoint(serviceEndpoint);
+      } else {
+        String securityToken = this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_SECURITY_TOKEN);
+        String password = PasswordManager.getInstance(this.workUnitState)
+            .readPassword(this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_PASSWORD));
+        partnerConfig.setUsername(this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_USERNAME));
+        partnerConfig.setPassword(password + securityToken);
+      }
+
       partnerConfig.setAuthEndpoint(soapAuthEndPoint);
+
       new PartnerConnection(partnerConfig);
       String soapEndpoint = partnerConfig.getServiceEndpoint();
       String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/")) + "async/" + apiVersion;