You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2020/04/29 00:00:27 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1132] move the logic of requester list verification to RequesterService implementation

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b8a046  [GOBBLIN-1132] move the logic of requester list verification to RequesterService implementation
7b8a046 is described below

commit 7b8a046de84c8e91a2b97e2960574deb0d6ce23d
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Apr 28 17:00:20 2020 -0700

    [GOBBLIN-1132] move the logic of requester list verification to RequesterService implementation
    
    Closes #2969 from arjun4084346/requesterListFix
---
 .../extract/restapi/RestApiConnector.java          |  4 ++--
 .../gobblin/service/FlowConfigsResource.java       | 19 ++++++++-------
 .../gobblin/service/FlowConfigsV2Resource.java     | 11 ++++-----
 .../apache/gobblin/service/RequesterService.java   | 28 +++++++++++++++++-----
 .../gobblin/salesforce/SalesforceConnector.java    |  7 ++----
 gradle/scripts/dependencyDefinitions.gradle        |  3 +--
 6 files changed, 42 insertions(+), 30 deletions(-)

diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java
index ae1f3aa..0c55d24 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java
@@ -167,9 +167,9 @@ public abstract class RestApiConnector {
       }
 
       if (status.getStatusCode() >= 400) {
-        log.info("Unable to get response using: " + url);
+        log.info("Unable to get response using: {} got status code {}", url, status.getStatusCode());
         JsonElement jsonRet = GSON.fromJson(jsonStr, JsonArray.class);
-        throw new RestApiProcessingException(getFirstErrorMessage("Failed to retrieve response from", jsonRet));
+        throw new RestApiProcessingException(getFirstErrorMessage("Failed to retrieve response from ", jsonRet));
       }
     } catch (Exception e) {
       throw new RestApiProcessingException("Failed to process rest api request; error - " + e.getMessage(), e);
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index a89eb02..6fdd608 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -88,15 +88,14 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
    */
   @Override
   public CreateResponse create(FlowConfig flowConfig) {
-    List<ServiceRequester> requestorList = this.requesterService.findRequesters(this);
+    List<ServiceRequester> requesterList = this.requesterService.findRequesters(this);
 
     try {
-      String serialized = this.requesterService.serialize(requestorList);
+      String serialized = RequesterService.serialize(requesterList);
       flowConfig.getProperties().put(RequesterService.REQUESTER_LIST, serialized);
       LOG.info("Rest requester list is " + serialized);
     } catch (IOException e) {
-      throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED,
-          "cannot get who is the requester", e);
+      throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, "cannot get who is the requester", e);
     }
     return this.flowConfigsResourceHandler.createFlowConfig(flowConfig);
   }
@@ -110,7 +109,7 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
    */
   @Override
   public UpdateResponse update(ComplexResourceKey<FlowId, EmptyRecord> key, FlowConfig flowConfig) {
-    checkRequester(get(key), this.requesterService.findRequesters(this));
+    checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -124,7 +123,7 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
    */
   @Override
   public UpdateResponse delete(ComplexResourceKey<FlowId, EmptyRecord> key) {
-    checkRequester(get(key), this.requesterService.findRequesters(this));
+    checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -137,10 +136,12 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
    * If there is a failure when deserializing the original requester list, throw a {@link FlowConfigLoggedException} with
    * {@link HttpStatus#S_400_BAD_REQUEST}.
    *
+   * @param requesterService the {@link RequesterService} used to verify the requester
    * @param originalFlowConfig original flow config to find original requester
    * @param requesterList list of requesters for this request
    */
-  public static void checkRequester(FlowConfig originalFlowConfig, List<ServiceRequester> requesterList) {
+  public static void checkRequester(
+      RequesterService requesterService, FlowConfig originalFlowConfig, List<ServiceRequester> requesterList) {
     if (requesterList == null) {
       return;
     }
@@ -149,8 +150,8 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
       String serializedOriginalRequesterList = originalFlowConfig.getProperties().get(RequesterService.REQUESTER_LIST);
       if (serializedOriginalRequesterList != null) {
         List<ServiceRequester> originalRequesterList = RequesterService.deserialize(serializedOriginalRequesterList);
-        if (!originalRequesterList.isEmpty() && (requesterList.isEmpty() || !originalRequesterList.containsAll(requesterList))) {
-          throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, "Requester not in original requester list");
+        if (!requesterService.isRequesterAllowed(originalRequesterList, requesterList)) {
+          throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, "Requester not allowed to make this request");
         }
       }
     } catch (IOException e) {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 8202994..77d64fc 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -96,12 +96,11 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
   public CreateKVResponse create(FlowConfig flowConfig) {
     List<ServiceRequester> requestorList = this.requesterService.findRequesters(this);
     try {
-      String serialized = this.requesterService.serialize(requestorList);
+      String serialized = RequesterService.serialize(requestorList);
       flowConfig.getProperties().put(RequesterService.REQUESTER_LIST, serialized);
       LOG.info("Rest requester list is " + serialized);
     } catch (IOException e) {
-      throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED,
-          "cannot get who is the requester", e);
+      throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, "cannot get who is the requester", e);
     }
     return (CreateKVResponse) this.getFlowConfigResourceHandler().createFlowConfig(flowConfig);
   }
@@ -115,7 +114,7 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
    */
   @Override
   public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, FlowConfig flowConfig) {
-    FlowConfigsResource.checkRequester(get(key), this.requesterService.findRequesters(this));
+    FlowConfigsResource.checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -130,7 +129,7 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
    */
   @Override
   public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, PatchRequest<FlowConfig> flowConfigPatch) {
-    FlowConfigsResource.checkRequester(get(key), this.requesterService.findRequesters(this));
+    FlowConfigsResource.checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -144,7 +143,7 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
    */
   @Override
   public UpdateResponse delete(ComplexResourceKey<FlowId, FlowStatusId> key) {
-    FlowConfigsResource.checkRequester(get(key), this.requesterService.findRequesters(this));
+    FlowConfigsResource.checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/RequesterService.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/RequesterService.java
index 5dcd0d7..847e9a3 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/RequesterService.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/RequesterService.java
@@ -20,10 +20,15 @@ package org.apache.gobblin.service;
 import java.io.IOException;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Base64;
+import java.util.Set;
+
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
+
+import com.google.common.collect.ImmutableSet;
 import com.linkedin.restli.server.resources.BaseResource;
 import com.typesafe.config.Config;
 
@@ -50,8 +55,8 @@ public abstract class RequesterService {
    */
   public static String serialize(List<ServiceRequester> requesterList) throws IOException {
     String jsonList = objectMapper.writeValueAsString(requesterList);
-    String base64Str = Base64.getEncoder().encodeToString(jsonList.getBytes("UTF-8"));
-    return URLEncoder.encode(base64Str, "UTF-8");
+    String base64Str = Base64.getEncoder().encodeToString(jsonList.getBytes(StandardCharsets.UTF_8));
+    return URLEncoder.encode(base64Str, StandardCharsets.UTF_8.name());
   }
 
   /**
@@ -59,13 +64,24 @@ public abstract class RequesterService {
    * {@link #serialize(List)}.
    */
   public static List<ServiceRequester> deserialize(String encodedString) throws IOException {
-    String base64Str = URLDecoder.decode(encodedString, "UTF-8");
+    String base64Str = URLDecoder.decode(encodedString, StandardCharsets.UTF_8.name());
     byte[] decodedBytes = Base64.getDecoder().decode(base64Str);
-    String jsonList = new String(decodedBytes, "UTF-8");
+    String jsonList = new String(decodedBytes, StandardCharsets.UTF_8);
     TypeReference<List<ServiceRequester>> mapType = new TypeReference<List<ServiceRequester>>() {};
-    List<ServiceRequester> requesterList = objectMapper.readValue(jsonList, mapType);
-    return requesterList;
+    return objectMapper.readValue(jsonList, mapType);
   }
 
   protected abstract List<ServiceRequester> findRequesters(BaseResource resource);
+
+  /**
+   * returns true if the requester is allowed to make this request.
+   * This default implementation accepts all requesters.
+   * @param originalRequesterList original requester list
+   * @param currentRequesterList current requester list
+   * @return true if the requester is allowed to make this request, false otherwise
+   */
+  protected boolean isRequesterAllowed(
+      List<ServiceRequester> originalRequesterList, List<ServiceRequester> currentRequesterList){
+    return true;
+  }
 }
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
index 1c89d84..77539be 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.NameValuePair;
+import org.apache.http.client.HttpClient;
 import org.apache.http.client.entity.UrlEncodedFormEntity;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpRequestBase;
@@ -95,11 +96,7 @@ public class SalesforceConnector extends RestApiConnector {
     try {
       HttpPost post = new HttpPost(host + DEFAULT_AUTH_TOKEN_PATH);
       post.setEntity(new UrlEncodedFormEntity(formParams));
-
-      HttpResponse httpResponse = getHttpClient().execute(post);
-      HttpEntity httpEntity = httpResponse.getEntity();
-
-      return httpEntity;
+      return getHttpClient().execute(post).getEntity();
     } catch (Exception e) {
       throw new RestApiConnectionException("Failed to authenticate salesforce host:"
           + host + "; error-" + e.getMessage(), e);
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index a87b6e4..cf6f515 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -72,7 +72,7 @@ ext.externalDependency = [
     "hiveSerDe": "org.apache.hive:hive-serde:" + hiveVersion,
     "httpclient": "org.apache.httpcomponents:httpclient:4.5.2",
     "httpmime": "org.apache.httpcomponents:httpmime:4.5.2",
-    "httpcore": "org.apache.httpcomponents:httpcore:4.4.4",
+    "httpcore": "org.apache.httpcomponents:httpcore:4.4.11",
     "httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3",
     "jgit": "org.eclipse.jgit:org.eclipse.jgit:5.1.1.201809181055-r",
     "jmh": "org.openjdk.jmh:jmh-core:1.17.3",
@@ -189,7 +189,6 @@ ext.externalDependency = [
     ],
     "postgresConnector": "org.postgresql:postgresql:42.1.4",
     "assertj": 'org.assertj:assertj-core:3.8.0',
-    "protobuf": 'com.google.protobuf:protobuf-java:3.6.1',
 ]
 
 if (!isDefaultEnvironment)