You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/12/01 10:17:32 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-426] Always assign user to pipeline operation requests

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 542a5b0  [STREAMPIPES-426] Always assign user to pipeline operation requests
542a5b0 is described below

commit 542a5b00055148f26e85e40e4f8ca8ca8dda1fe7
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Dec 1 11:17:23 2021 +0100

    [STREAMPIPES-426] Always assign user to pipeline operation requests
---
 .../manager/execution/http/GraphSubmitter.java     |  8 +++---
 .../manager/execution/http/HttpRequestBuilder.java | 31 +++++++++++++++++++---
 .../manager/health/PipelineHealthCheck.java        |  2 +-
 .../manager/preview/PipelinePreview.java           |  4 +--
 .../resource/management/UserResourceManager.java   |  4 +++
 .../user/management/jwt/JwtTokenProvider.java      | 12 +++++++++
 6 files changed, 51 insertions(+), 10 deletions(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
index a215f6c..e3db01c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
@@ -110,21 +110,21 @@ public class GraphSubmitter {
 
   private PipelineElementStatus performInvocation(InvocableStreamPipesEntity entity) {
     String endpointUrl = entity.getSelectedEndpointUrl();
-    return new HttpRequestBuilder(entity, endpointUrl).invoke();
+    return new HttpRequestBuilder(entity, endpointUrl, this.pipelineId).invoke();
   }
 
   private PipelineElementStatus performInvocation(SpDataSet dataset) {
     String endpointUrl = dataset.getSelectedEndpointUrl();
-    return new HttpRequestBuilder(dataset, endpointUrl).invoke();
+    return new HttpRequestBuilder(dataset, endpointUrl, this.pipelineId).invoke();
   }
 
   private PipelineElementStatus performDetach(InvocableStreamPipesEntity entity) {
     String endpointUrl = entity.getSelectedEndpointUrl() + "/" + InstanceIdExtractor.extractId(entity.getElementId());
-    return new HttpRequestBuilder(entity, endpointUrl).detach();
+    return new HttpRequestBuilder(entity, endpointUrl, this.pipelineId).detach();
   }
 
   private PipelineElementStatus performDetach(SpDataSet dataset) {
     String endpointUrl = dataset.getSelectedEndpointUrl() + "/" + dataset.getCorrespondingAdapterId() + "/" + dataset.getDatasetInvocationId();
-    return new HttpRequestBuilder(dataset, endpointUrl).detach();
+    return new HttpRequestBuilder(dataset, endpointUrl, this.pipelineId).detach();
   }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
index 93e66c6..54653ff 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
@@ -22,8 +22,12 @@ import com.google.gson.JsonSyntaxException;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.client.fluent.Response;
 import org.apache.http.entity.ContentType;
+import org.apache.streampipes.manager.storage.UserService;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.client.user.Permission;
+import org.apache.streampipes.model.client.user.Principal;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.resource.management.SpResourceManager;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.user.management.jwt.JwtTokenProvider;
 import org.slf4j.Logger;
@@ -37,12 +41,16 @@ public class HttpRequestBuilder {
 
   private final NamedStreamPipesEntity payload;
   private final String endpointUrl;
+  private String pipelineId;
 
   private final static Logger LOG = LoggerFactory.getLogger(HttpRequestBuilder.class);
 
-  public HttpRequestBuilder(NamedStreamPipesEntity payload, String endpointUrl) {
+  public HttpRequestBuilder(NamedStreamPipesEntity payload,
+                            String endpointUrl,
+                            String pipelineId) {
     this.payload = payload;
     this.endpointUrl = endpointUrl;
+    this.pipelineId = pipelineId;
   }
 
   public PipelineElementStatus invoke() {
@@ -57,6 +65,7 @@ public class HttpRequestBuilder {
                       .execute();
       return handleResponse(httpResp);
     } catch (Exception e) {
+      e.printStackTrace();
       LOG.error(e.getMessage());
       return new PipelineElementStatus(endpointUrl, payload.getName(), false, e.getMessage());
     }
@@ -89,7 +98,23 @@ public class HttpRequestBuilder {
   }
 
   private String getAuthToken() {
-    Authentication auth = SecurityContextHolder.getContext().getAuthentication();
-    return "Bearer " + new JwtTokenProvider().createToken(auth);
+    if (SecurityContextHolder.getContext().getAuthentication() != null) {
+      Authentication auth = SecurityContextHolder.getContext().getAuthentication();
+      return "Bearer " + new JwtTokenProvider().createToken(auth);
+    } else {
+      if (this.pipelineId != null) {
+        String ownerSid = new SpResourceManager()
+                .managePermissions()
+                .findForObjectId(this.pipelineId)
+                .stream()
+                .findFirst()
+                .map(Permission::getOwnerSid).orElseThrow(() -> new IllegalArgumentException("Could not find owner for pipeline"));
+
+        Principal correspondingUser = new SpResourceManager().manageUsers().getPrincipalById(ownerSid);
+        return "Bearer " + new JwtTokenProvider().createToken(correspondingUser);
+      } else {
+        throw new IllegalArgumentException("No authenticated user found to associate with invocation request");
+      }
+    }
   }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
index 76afadc..217b73c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
@@ -71,7 +71,7 @@ public class PipelineHealthCheck implements Runnable {
               boolean success;
               try {
                 endpointUrl = findEndpointUrl(graph);
-                success = new HttpRequestBuilder(graph, endpointUrl).invoke().isSuccess();
+                success = new HttpRequestBuilder(graph, endpointUrl, pipeline.getPipelineId()).invoke().isSuccess();
               } catch (NoServiceEndpointsAvailableException e) {
                 success = false;
               }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index ddc7bd3..45540a2 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -78,11 +78,11 @@ public class PipelinePreview {
   }
 
   private void invokeGraphs(List<InvocableStreamPipesEntity> graphs) {
-    graphs.forEach(g -> new HttpRequestBuilder(g, g.getBelongsTo()).invoke());
+    graphs.forEach(g -> new HttpRequestBuilder(g, g.getBelongsTo(), null).invoke());
   }
 
   private void detachGraphs(List<InvocableStreamPipesEntity> graphs) {
-    graphs.forEach(g -> new HttpRequestBuilder(g, g.getUri()).detach());
+    graphs.forEach(g -> new HttpRequestBuilder(g, g.getUri(), null).detach());
   }
 
   private void deleteGraphs(String previewId) {
diff --git a/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java b/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java
index 7b45830..ec6973b 100644
--- a/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java
+++ b/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java
@@ -53,6 +53,10 @@ public class UserResourceManager extends AbstractResourceManager<IUserStorage> {
     return StorageDispatcher.INSTANCE.getNoSqlStore().getUserStorageAPI();
   }
 
+  public Principal getPrincipalById(String principalId) {
+    return db.getUserById(principalId);
+  }
+
   public boolean registerUser(RegistrationData data) throws UsernameAlreadyTakenException {
 
     try {
diff --git a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java
index d18e499..9221be4 100644
--- a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java
+++ b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.model.client.user.UserAccount;
 import org.apache.streampipes.model.client.user.UserInfo;
 import org.apache.streampipes.security.jwt.JwtTokenUtils;
 import org.apache.streampipes.user.management.model.PrincipalUserDetails;
+import org.apache.streampipes.user.management.util.GrantedAuthoritiesBuilder;
 import org.springframework.security.core.Authentication;
 import org.springframework.security.core.GrantedAuthority;
 
@@ -52,6 +53,17 @@ public class JwtTokenProvider {
 						.map(GrantedAuthority::getAuthority)
 						.collect(Collectors.toSet());
 
+		return createToken(userPrincipal, roles);
+
+	}
+
+	public String createToken(Principal userPrincipal) {
+		Set<String> roles = new GrantedAuthoritiesBuilder(userPrincipal).buildAllAuthorities();
+		return createToken(userPrincipal, roles);
+	}
+
+	public String createToken(Principal userPrincipal,
+							  Set<String> roles) {
 		Date tokenExpirationDate = makeExpirationDate();
 		Map<String, Object> claims = makeClaims(userPrincipal, roles);