You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/12/01 10:17:32 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-426] Always assign user to pipeline operation requests
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 542a5b0 [STREAMPIPES-426] Always assign user to pipeline operation requests
542a5b0 is described below
commit 542a5b00055148f26e85e40e4f8ca8ca8dda1fe7
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Dec 1 11:17:23 2021 +0100
[STREAMPIPES-426] Always assign user to pipeline operation requests
---
.../manager/execution/http/GraphSubmitter.java | 8 +++---
.../manager/execution/http/HttpRequestBuilder.java | 31 +++++++++++++++++++---
.../manager/health/PipelineHealthCheck.java | 2 +-
.../manager/preview/PipelinePreview.java | 4 +--
.../resource/management/UserResourceManager.java | 4 +++
.../user/management/jwt/JwtTokenProvider.java | 12 +++++++++
6 files changed, 51 insertions(+), 10 deletions(-)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
index a215f6c..e3db01c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
@@ -110,21 +110,21 @@ public class GraphSubmitter {
private PipelineElementStatus performInvocation(InvocableStreamPipesEntity entity) {
String endpointUrl = entity.getSelectedEndpointUrl();
- return new HttpRequestBuilder(entity, endpointUrl).invoke();
+ return new HttpRequestBuilder(entity, endpointUrl, this.pipelineId).invoke();
}
private PipelineElementStatus performInvocation(SpDataSet dataset) {
String endpointUrl = dataset.getSelectedEndpointUrl();
- return new HttpRequestBuilder(dataset, endpointUrl).invoke();
+ return new HttpRequestBuilder(dataset, endpointUrl, this.pipelineId).invoke();
}
private PipelineElementStatus performDetach(InvocableStreamPipesEntity entity) {
String endpointUrl = entity.getSelectedEndpointUrl() + "/" + InstanceIdExtractor.extractId(entity.getElementId());
- return new HttpRequestBuilder(entity, endpointUrl).detach();
+ return new HttpRequestBuilder(entity, endpointUrl, this.pipelineId).detach();
}
private PipelineElementStatus performDetach(SpDataSet dataset) {
String endpointUrl = dataset.getSelectedEndpointUrl() + "/" + dataset.getCorrespondingAdapterId() + "/" + dataset.getDatasetInvocationId();
- return new HttpRequestBuilder(dataset, endpointUrl).detach();
+ return new HttpRequestBuilder(dataset, endpointUrl, this.pipelineId).detach();
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
index 93e66c6..54653ff 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
@@ -22,8 +22,12 @@ import com.google.gson.JsonSyntaxException;
import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
import org.apache.http.entity.ContentType;
+import org.apache.streampipes.manager.storage.UserService;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.client.user.Permission;
+import org.apache.streampipes.model.client.user.Principal;
import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.user.management.jwt.JwtTokenProvider;
import org.slf4j.Logger;
@@ -37,12 +41,16 @@ public class HttpRequestBuilder {
private final NamedStreamPipesEntity payload;
private final String endpointUrl;
+ private String pipelineId;
private final static Logger LOG = LoggerFactory.getLogger(HttpRequestBuilder.class);
- public HttpRequestBuilder(NamedStreamPipesEntity payload, String endpointUrl) {
+ public HttpRequestBuilder(NamedStreamPipesEntity payload,
+ String endpointUrl,
+ String pipelineId) {
this.payload = payload;
this.endpointUrl = endpointUrl;
+ this.pipelineId = pipelineId;
}
public PipelineElementStatus invoke() {
@@ -57,6 +65,7 @@ public class HttpRequestBuilder {
.execute();
return handleResponse(httpResp);
} catch (Exception e) {
+ e.printStackTrace();
LOG.error(e.getMessage());
return new PipelineElementStatus(endpointUrl, payload.getName(), false, e.getMessage());
}
@@ -89,7 +98,23 @@ public class HttpRequestBuilder {
}
private String getAuthToken() {
- Authentication auth = SecurityContextHolder.getContext().getAuthentication();
- return "Bearer " + new JwtTokenProvider().createToken(auth);
+ if (SecurityContextHolder.getContext().getAuthentication() != null) {
+ Authentication auth = SecurityContextHolder.getContext().getAuthentication();
+ return "Bearer " + new JwtTokenProvider().createToken(auth);
+ } else {
+ if (this.pipelineId != null) {
+ String ownerSid = new SpResourceManager()
+ .managePermissions()
+ .findForObjectId(this.pipelineId)
+ .stream()
+ .findFirst()
+ .map(Permission::getOwnerSid).orElseThrow(() -> new IllegalArgumentException("Could not find owner for pipeline"));
+
+ Principal correspondingUser = new SpResourceManager().manageUsers().getPrincipalById(ownerSid);
+ return "Bearer " + new JwtTokenProvider().createToken(correspondingUser);
+ } else {
+ throw new IllegalArgumentException("No authenticated user found to associate with invocation request");
+ }
+ }
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
index 76afadc..217b73c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
@@ -71,7 +71,7 @@ public class PipelineHealthCheck implements Runnable {
boolean success;
try {
endpointUrl = findEndpointUrl(graph);
- success = new HttpRequestBuilder(graph, endpointUrl).invoke().isSuccess();
+ success = new HttpRequestBuilder(graph, endpointUrl, pipeline.getPipelineId()).invoke().isSuccess();
} catch (NoServiceEndpointsAvailableException e) {
success = false;
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index ddc7bd3..45540a2 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -78,11 +78,11 @@ public class PipelinePreview {
}
private void invokeGraphs(List<InvocableStreamPipesEntity> graphs) {
- graphs.forEach(g -> new HttpRequestBuilder(g, g.getBelongsTo()).invoke());
+ graphs.forEach(g -> new HttpRequestBuilder(g, g.getBelongsTo(), null).invoke());
}
private void detachGraphs(List<InvocableStreamPipesEntity> graphs) {
- graphs.forEach(g -> new HttpRequestBuilder(g, g.getUri()).detach());
+ graphs.forEach(g -> new HttpRequestBuilder(g, g.getUri(), null).detach());
}
private void deleteGraphs(String previewId) {
diff --git a/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java b/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java
index 7b45830..ec6973b 100644
--- a/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java
+++ b/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java
@@ -53,6 +53,10 @@ public class UserResourceManager extends AbstractResourceManager<IUserStorage> {
return StorageDispatcher.INSTANCE.getNoSqlStore().getUserStorageAPI();
}
+ public Principal getPrincipalById(String principalId) {
+ return db.getUserById(principalId);
+ }
+
public boolean registerUser(RegistrationData data) throws UsernameAlreadyTakenException {
try {
diff --git a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java
index d18e499..9221be4 100644
--- a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java
+++ b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.model.client.user.UserAccount;
import org.apache.streampipes.model.client.user.UserInfo;
import org.apache.streampipes.security.jwt.JwtTokenUtils;
import org.apache.streampipes.user.management.model.PrincipalUserDetails;
+import org.apache.streampipes.user.management.util.GrantedAuthoritiesBuilder;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
@@ -52,6 +53,17 @@ public class JwtTokenProvider {
.map(GrantedAuthority::getAuthority)
.collect(Collectors.toSet());
+ return createToken(userPrincipal, roles);
+
+ }
+
+ public String createToken(Principal userPrincipal) {
+ Set<String> roles = new GrantedAuthoritiesBuilder(userPrincipal).buildAllAuthorities();
+ return createToken(userPrincipal, roles);
+ }
+
+ public String createToken(Principal userPrincipal,
+ Set<String> roles) {
Date tokenExpirationDate = makeExpirationDate();
Map<String, Object> claims = makeClaims(userPrincipal, roles);