You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/06/17 19:00:31 UTC

[incubator-streampipes] branch STREAMPIPES-319 updated: [STREAMPIPES-319] Fetch assets dynamically based on available endpoints

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-319
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/STREAMPIPES-319 by this push:
     new f1d3aaa  [STREAMPIPES-319] Fetch assets dynamically based on available endpoints
f1d3aaa is described below

commit f1d3aaabbaafb31bcc30d32d60dfbf4076f978c6
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Thu Jun 17 21:00:16 2021 +0200

    [STREAMPIPES-319] Fetch assets dynamically based on available endpoints
---
 .../org/apache/streampipes/manager/assets/AssetFetcher.java | 13 ++++++++-----
 .../org/apache/streampipes/manager/assets/AssetManager.java |  6 ++++--
 .../execution/http/PipelineElementEndpointGenerator.java    |  5 +----
 .../manager/execution/http/PipelineExecutor.java            |  2 +-
 .../manager/verification/DataProcessorVerifier.java         |  3 ++-
 .../streampipes/manager/verification/DataSinkVerifier.java  |  3 ++-
 .../manager/verification/DataStreamVerifier.java            |  3 ++-
 7 files changed, 20 insertions(+), 15 deletions(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
index 7f8e3bc..06d0891 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
@@ -18,6 +18,8 @@
 package org.apache.streampipes.manager.assets;
 
 import org.apache.http.client.fluent.Request;
+import org.apache.streampipes.commons.constants.PipelineElementUrl;
+import org.apache.streampipes.manager.execution.http.PipelineElementEndpointGenerator;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -26,18 +28,19 @@ public class AssetFetcher {
 
   private static final String ASSET_ENDPOINT_APPENDIX = "/assets";
 
-  private String pipelineElementUri;
+  private PipelineElementUrl pipelineElementUrl;
   private String appId;
 
-  public AssetFetcher(String pipelineElementUri, String appId) {
-    this.pipelineElementUri = pipelineElementUri;
+  public AssetFetcher(PipelineElementUrl pipelineElementUrl,
+                      String appId) {
+    this.pipelineElementUrl = pipelineElementUrl;
     this.appId = appId;
   }
 
   public InputStream fetchPipelineElementAssets() throws IOException {
-
+    String endpointUrl = new PipelineElementEndpointGenerator(appId, pipelineElementUrl).getEndpointResourceUrl();
     return Request
-            .Get(pipelineElementUri + ASSET_ENDPOINT_APPENDIX)
+            .Get(endpointUrl + ASSET_ENDPOINT_APPENDIX)
             .execute()
             .returnContent()
             .asStream();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java
index 95baeb3..19096c7 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java
@@ -19,6 +19,7 @@ package org.apache.streampipes.manager.assets;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
+import org.apache.streampipes.commons.constants.PipelineElementUrl;
 
 import java.io.File;
 import java.io.IOException;
@@ -41,8 +42,9 @@ public class AssetManager {
     return Files.readAllBytes(Paths.get(getAssetPath(appId, assetName)));
   }
 
-  public static void storeAsset(String pipelineElementUri, String appId) throws IOException {
-    InputStream assetStream = new AssetFetcher(pipelineElementUri, appId)
+  public static void storeAsset(PipelineElementUrl pipelineElementUrl,
+                                String appId) throws IOException {
+    InputStream assetStream = new AssetFetcher(pipelineElementUrl, appId)
             .fetchPipelineElementAssets();
     new AssetExtractor(assetStream, appId).extractAssetContents();
   }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementEndpointGenerator.java
index 191453b..3449f9e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementEndpointGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementEndpointGenerator.java
@@ -30,13 +30,10 @@ public class PipelineElementEndpointGenerator {
   private static final Logger LOG = LoggerFactory.getLogger(PipelineElementEndpointGenerator.class);
 
   private String appId;
-  private String elementId;
   private PipelineElementUrl pipelineElementUrl;
 
-  public PipelineElementEndpointGenerator(String elementId,
-                                          String appId,
+  public PipelineElementEndpointGenerator(String appId,
                                           PipelineElementUrl pipelineElementUrl) {
-    this.elementId = elementId;
     this.appId = appId;
     this.pipelineElementUrl = pipelineElementUrl;
   }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index 7ac5c72..8cae245 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -80,7 +80,7 @@ public class PipelineExecutor {
 
     decryptSecrets(graphs);
 
-    graphs.forEach(g -> g.setSelectedEndpointUrl(new PipelineElementEndpointGenerator(g.getElementId(),
+    graphs.forEach(g -> g.setSelectedEndpointUrl(new PipelineElementEndpointGenerator(
             g.getAppId(),
             getPipelineElementType(g))
             .getEndpointResourceUrl()));
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
index 95382ef..e18fd32 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.manager.verification;
 
+import org.apache.streampipes.commons.constants.PipelineElementUrl;
 import org.apache.streampipes.commons.exceptions.SepaParseException;
 import org.apache.streampipes.manager.assets.AssetManager;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
@@ -66,7 +67,7 @@ public class DataProcessorVerifier extends ElementVerifier<DataProcessorDescript
   @Override
   protected void storeAssets() throws IOException {
     if (elementDescription.isIncludesAssets()) {
-      AssetManager.storeAsset(elementDescription.getElementId(), elementDescription.getAppId());
+      AssetManager.storeAsset(PipelineElementUrl.DATA_PROCESSOR, elementDescription.getAppId());
     }
   }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
index 2f3e9b6..a8c6060 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.manager.verification;
 
+import org.apache.streampipes.commons.constants.PipelineElementUrl;
 import org.apache.streampipes.commons.exceptions.SepaParseException;
 import org.apache.streampipes.manager.assets.AssetManager;
 import org.apache.streampipes.model.graph.DataSinkDescription;
@@ -75,7 +76,7 @@ public class DataSinkVerifier extends ElementVerifier<DataSinkDescription> {
 	@Override
 	protected void storeAssets() throws IOException  {
 		if (elementDescription.isIncludesAssets()) {
-			AssetManager.storeAsset(elementDescription.getElementId(), elementDescription.getAppId());
+			AssetManager.storeAsset(PipelineElementUrl.DATA_SINK, elementDescription.getAppId());
 		}
 	}
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
index a87b1b2..b7d1b90 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.manager.verification;
 
+import org.apache.streampipes.commons.constants.PipelineElementUrl;
 import org.apache.streampipes.manager.assets.AssetManager;
 import org.apache.streampipes.model.SpDataStream;
 
@@ -72,7 +73,7 @@ public class DataStreamVerifier extends ElementVerifier<SpDataStream> {
   @Override
   protected void storeAssets() throws IOException {
     if (elementDescription.isIncludesAssets()) {
-      AssetManager.storeAsset(elementDescription.getElementId(), elementDescription.getAppId());
+      AssetManager.storeAsset(PipelineElementUrl.DATA_STREAM, elementDescription.getAppId());
     }
   }