You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2020/04/29 00:00:27 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1132] move the
logic of requester list verification to RequesterService implementation
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7b8a046 [GOBBLIN-1132] move the logic of requester list verification to RequesterService implementation
7b8a046 is described below
commit 7b8a046de84c8e91a2b97e2960574deb0d6ce23d
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Apr 28 17:00:20 2020 -0700
[GOBBLIN-1132] move the logic of requester list verification to RequesterService implementation
Closes #2969 from arjun4084346/requesterListFix
---
.../extract/restapi/RestApiConnector.java | 4 ++--
.../gobblin/service/FlowConfigsResource.java | 19 ++++++++-------
.../gobblin/service/FlowConfigsV2Resource.java | 11 ++++-----
.../apache/gobblin/service/RequesterService.java | 28 +++++++++++++++++-----
.../gobblin/salesforce/SalesforceConnector.java | 7 ++----
gradle/scripts/dependencyDefinitions.gradle | 3 +--
6 files changed, 42 insertions(+), 30 deletions(-)
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java
index ae1f3aa..0c55d24 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java
@@ -167,9 +167,9 @@ public abstract class RestApiConnector {
}
if (status.getStatusCode() >= 400) {
- log.info("Unable to get response using: " + url);
+ log.info("Unable to get response using: {} got status code {}", url, status.getStatusCode());
JsonElement jsonRet = GSON.fromJson(jsonStr, JsonArray.class);
- throw new RestApiProcessingException(getFirstErrorMessage("Failed to retrieve response from", jsonRet));
+ throw new RestApiProcessingException(getFirstErrorMessage("Failed to retrieve response from ", jsonRet));
}
} catch (Exception e) {
throw new RestApiProcessingException("Failed to process rest api request; error - " + e.getMessage(), e);
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index a89eb02..6fdd608 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -88,15 +88,14 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
*/
@Override
public CreateResponse create(FlowConfig flowConfig) {
- List<ServiceRequester> requestorList = this.requesterService.findRequesters(this);
+ List<ServiceRequester> requesterList = this.requesterService.findRequesters(this);
try {
- String serialized = this.requesterService.serialize(requestorList);
+ String serialized = RequesterService.serialize(requesterList);
flowConfig.getProperties().put(RequesterService.REQUESTER_LIST, serialized);
LOG.info("Rest requester list is " + serialized);
} catch (IOException e) {
- throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED,
- "cannot get who is the requester", e);
+ throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, "cannot get who is the requester", e);
}
return this.flowConfigsResourceHandler.createFlowConfig(flowConfig);
}
@@ -110,7 +109,7 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
*/
@Override
public UpdateResponse update(ComplexResourceKey<FlowId, EmptyRecord> key, FlowConfig flowConfig) {
- checkRequester(get(key), this.requesterService.findRequesters(this));
+ checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -124,7 +123,7 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
*/
@Override
public UpdateResponse delete(ComplexResourceKey<FlowId, EmptyRecord> key) {
- checkRequester(get(key), this.requesterService.findRequesters(this));
+ checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -137,10 +136,12 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
* If there is a failure when deserializing the original requester list, throw a {@link FlowConfigLoggedException} with
* {@link HttpStatus#S_400_BAD_REQUEST}.
*
+ * @param requesterService the {@link RequesterService} used to verify the requester
* @param originalFlowConfig original flow config to find original requester
* @param requesterList list of requesters for this request
*/
- public static void checkRequester(FlowConfig originalFlowConfig, List<ServiceRequester> requesterList) {
+ public static void checkRequester(
+ RequesterService requesterService, FlowConfig originalFlowConfig, List<ServiceRequester> requesterList) {
if (requesterList == null) {
return;
}
@@ -149,8 +150,8 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
String serializedOriginalRequesterList = originalFlowConfig.getProperties().get(RequesterService.REQUESTER_LIST);
if (serializedOriginalRequesterList != null) {
List<ServiceRequester> originalRequesterList = RequesterService.deserialize(serializedOriginalRequesterList);
- if (!originalRequesterList.isEmpty() && (requesterList.isEmpty() || !originalRequesterList.containsAll(requesterList))) {
- throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, "Requester not in original requester list");
+ if (!requesterService.isRequesterAllowed(originalRequesterList, requesterList)) {
+ throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, "Requester not allowed to make this request");
}
}
} catch (IOException e) {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 8202994..77d64fc 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -96,12 +96,11 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
public CreateKVResponse create(FlowConfig flowConfig) {
List<ServiceRequester> requestorList = this.requesterService.findRequesters(this);
try {
- String serialized = this.requesterService.serialize(requestorList);
+ String serialized = RequesterService.serialize(requestorList);
flowConfig.getProperties().put(RequesterService.REQUESTER_LIST, serialized);
LOG.info("Rest requester list is " + serialized);
} catch (IOException e) {
- throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED,
- "cannot get who is the requester", e);
+ throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, "cannot get who is the requester", e);
}
return (CreateKVResponse) this.getFlowConfigResourceHandler().createFlowConfig(flowConfig);
}
@@ -115,7 +114,7 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
*/
@Override
public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, FlowConfig flowConfig) {
- FlowConfigsResource.checkRequester(get(key), this.requesterService.findRequesters(this));
+ FlowConfigsResource.checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -130,7 +129,7 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
*/
@Override
public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, PatchRequest<FlowConfig> flowConfigPatch) {
- FlowConfigsResource.checkRequester(get(key), this.requesterService.findRequesters(this));
+ FlowConfigsResource.checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -144,7 +143,7 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
*/
@Override
public UpdateResponse delete(ComplexResourceKey<FlowId, FlowStatusId> key) {
- FlowConfigsResource.checkRequester(get(key), this.requesterService.findRequesters(this));
+ FlowConfigsResource.checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/RequesterService.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/RequesterService.java
index 5dcd0d7..847e9a3 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/RequesterService.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/RequesterService.java
@@ -20,10 +20,15 @@ package org.apache.gobblin.service;
import java.io.IOException;
import java.net.URLDecoder;
import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Base64;
+import java.util.Set;
+
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
+
+import com.google.common.collect.ImmutableSet;
import com.linkedin.restli.server.resources.BaseResource;
import com.typesafe.config.Config;
@@ -50,8 +55,8 @@ public abstract class RequesterService {
*/
public static String serialize(List<ServiceRequester> requesterList) throws IOException {
String jsonList = objectMapper.writeValueAsString(requesterList);
- String base64Str = Base64.getEncoder().encodeToString(jsonList.getBytes("UTF-8"));
- return URLEncoder.encode(base64Str, "UTF-8");
+ String base64Str = Base64.getEncoder().encodeToString(jsonList.getBytes(StandardCharsets.UTF_8));
+ return URLEncoder.encode(base64Str, StandardCharsets.UTF_8.name());
}
/**
@@ -59,13 +64,24 @@ public abstract class RequesterService {
* {@link #serialize(List)}.
*/
public static List<ServiceRequester> deserialize(String encodedString) throws IOException {
- String base64Str = URLDecoder.decode(encodedString, "UTF-8");
+ String base64Str = URLDecoder.decode(encodedString, StandardCharsets.UTF_8.name());
byte[] decodedBytes = Base64.getDecoder().decode(base64Str);
- String jsonList = new String(decodedBytes, "UTF-8");
+ String jsonList = new String(decodedBytes, StandardCharsets.UTF_8);
TypeReference<List<ServiceRequester>> mapType = new TypeReference<List<ServiceRequester>>() {};
- List<ServiceRequester> requesterList = objectMapper.readValue(jsonList, mapType);
- return requesterList;
+ return objectMapper.readValue(jsonList, mapType);
}
protected abstract List<ServiceRequester> findRequesters(BaseResource resource);
+
+ /**
+ * returns true if the requester is allowed to make this request.
+ * This default implementation accepts all requesters.
+ * @param originalRequesterList original requester list
+ * @param currentRequesterList current requester list
+ * @return true if the requester is allowed to make this request, false otherwise
+ */
+ protected boolean isRequesterAllowed(
+ List<ServiceRequester> originalRequesterList, List<ServiceRequester> currentRequesterList){
+ return true;
+ }
}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
index 1c89d84..77539be 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
+import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
@@ -95,11 +96,7 @@ public class SalesforceConnector extends RestApiConnector {
try {
HttpPost post = new HttpPost(host + DEFAULT_AUTH_TOKEN_PATH);
post.setEntity(new UrlEncodedFormEntity(formParams));
-
- HttpResponse httpResponse = getHttpClient().execute(post);
- HttpEntity httpEntity = httpResponse.getEntity();
-
- return httpEntity;
+ return getHttpClient().execute(post).getEntity();
} catch (Exception e) {
throw new RestApiConnectionException("Failed to authenticate salesforce host:"
+ host + "; error-" + e.getMessage(), e);
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index a87b6e4..cf6f515 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -72,7 +72,7 @@ ext.externalDependency = [
"hiveSerDe": "org.apache.hive:hive-serde:" + hiveVersion,
"httpclient": "org.apache.httpcomponents:httpclient:4.5.2",
"httpmime": "org.apache.httpcomponents:httpmime:4.5.2",
- "httpcore": "org.apache.httpcomponents:httpcore:4.4.4",
+ "httpcore": "org.apache.httpcomponents:httpcore:4.4.11",
"httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3",
"jgit": "org.eclipse.jgit:org.eclipse.jgit:5.1.1.201809181055-r",
"jmh": "org.openjdk.jmh:jmh-core:1.17.3",
@@ -189,7 +189,6 @@ ext.externalDependency = [
],
"postgresConnector": "org.postgresql:postgresql:42.1.4",
"assertj": 'org.assertj:assertj-core:3.8.0',
- "protobuf": 'com.google.protobuf:protobuf-java:3.6.1',
]
if (!isDefaultEnvironment)