You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/06/17 19:00:31 UTC
[incubator-streampipes] branch STREAMPIPES-319 updated:
[STREAMPIPES-319] Fetch assets dynamically based on available endpoints
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch STREAMPIPES-319
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/STREAMPIPES-319 by this push:
new f1d3aaa [STREAMPIPES-319] Fetch assets dynamically based on available endpoints
f1d3aaa is described below
commit f1d3aaabbaafb31bcc30d32d60dfbf4076f978c6
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Thu Jun 17 21:00:16 2021 +0200
[STREAMPIPES-319] Fetch assets dynamically based on available endpoints
---
.../org/apache/streampipes/manager/assets/AssetFetcher.java | 13 ++++++++-----
.../org/apache/streampipes/manager/assets/AssetManager.java | 6 ++++--
.../execution/http/PipelineElementEndpointGenerator.java | 5 +----
.../manager/execution/http/PipelineExecutor.java | 2 +-
.../manager/verification/DataProcessorVerifier.java | 3 ++-
.../streampipes/manager/verification/DataSinkVerifier.java | 3 ++-
.../manager/verification/DataStreamVerifier.java | 3 ++-
7 files changed, 20 insertions(+), 15 deletions(-)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
index 7f8e3bc..06d0891 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
@@ -18,6 +18,8 @@
package org.apache.streampipes.manager.assets;
import org.apache.http.client.fluent.Request;
+import org.apache.streampipes.commons.constants.PipelineElementUrl;
+import org.apache.streampipes.manager.execution.http.PipelineElementEndpointGenerator;
import java.io.IOException;
import java.io.InputStream;
@@ -26,18 +28,19 @@ public class AssetFetcher {
private static final String ASSET_ENDPOINT_APPENDIX = "/assets";
- private String pipelineElementUri;
+ private PipelineElementUrl pipelineElementUrl;
private String appId;
- public AssetFetcher(String pipelineElementUri, String appId) {
- this.pipelineElementUri = pipelineElementUri;
+ public AssetFetcher(PipelineElementUrl pipelineElementUrl,
+ String appId) {
+ this.pipelineElementUrl = pipelineElementUrl;
this.appId = appId;
}
public InputStream fetchPipelineElementAssets() throws IOException {
-
+ String endpointUrl = new PipelineElementEndpointGenerator(appId, pipelineElementUrl).getEndpointResourceUrl();
return Request
- .Get(pipelineElementUri + ASSET_ENDPOINT_APPENDIX)
+ .Get(endpointUrl + ASSET_ENDPOINT_APPENDIX)
.execute()
.returnContent()
.asStream();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java
index 95baeb3..19096c7 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java
@@ -19,6 +19,7 @@ package org.apache.streampipes.manager.assets;
import org.apache.commons.io.FileUtils;
import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
+import org.apache.streampipes.commons.constants.PipelineElementUrl;
import java.io.File;
import java.io.IOException;
@@ -41,8 +42,9 @@ public class AssetManager {
return Files.readAllBytes(Paths.get(getAssetPath(appId, assetName)));
}
- public static void storeAsset(String pipelineElementUri, String appId) throws IOException {
- InputStream assetStream = new AssetFetcher(pipelineElementUri, appId)
+ public static void storeAsset(PipelineElementUrl pipelineElementUrl,
+ String appId) throws IOException {
+ InputStream assetStream = new AssetFetcher(pipelineElementUrl, appId)
.fetchPipelineElementAssets();
new AssetExtractor(assetStream, appId).extractAssetContents();
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementEndpointGenerator.java
index 191453b..3449f9e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementEndpointGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementEndpointGenerator.java
@@ -30,13 +30,10 @@ public class PipelineElementEndpointGenerator {
private static final Logger LOG = LoggerFactory.getLogger(PipelineElementEndpointGenerator.class);
private String appId;
- private String elementId;
private PipelineElementUrl pipelineElementUrl;
- public PipelineElementEndpointGenerator(String elementId,
- String appId,
+ public PipelineElementEndpointGenerator(String appId,
PipelineElementUrl pipelineElementUrl) {
- this.elementId = elementId;
this.appId = appId;
this.pipelineElementUrl = pipelineElementUrl;
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index 7ac5c72..8cae245 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -80,7 +80,7 @@ public class PipelineExecutor {
decryptSecrets(graphs);
- graphs.forEach(g -> g.setSelectedEndpointUrl(new PipelineElementEndpointGenerator(g.getElementId(),
+ graphs.forEach(g -> g.setSelectedEndpointUrl(new PipelineElementEndpointGenerator(
g.getAppId(),
getPipelineElementType(g))
.getEndpointResourceUrl()));
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
index 95382ef..e18fd32 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.manager.verification;
+import org.apache.streampipes.commons.constants.PipelineElementUrl;
import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.manager.assets.AssetManager;
import org.apache.streampipes.model.graph.DataProcessorDescription;
@@ -66,7 +67,7 @@ public class DataProcessorVerifier extends ElementVerifier<DataProcessorDescript
@Override
protected void storeAssets() throws IOException {
if (elementDescription.isIncludesAssets()) {
- AssetManager.storeAsset(elementDescription.getElementId(), elementDescription.getAppId());
+ AssetManager.storeAsset(PipelineElementUrl.DATA_PROCESSOR, elementDescription.getAppId());
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
index 2f3e9b6..a8c6060 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.manager.verification;
+import org.apache.streampipes.commons.constants.PipelineElementUrl;
import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.manager.assets.AssetManager;
import org.apache.streampipes.model.graph.DataSinkDescription;
@@ -75,7 +76,7 @@ public class DataSinkVerifier extends ElementVerifier<DataSinkDescription> {
@Override
protected void storeAssets() throws IOException {
if (elementDescription.isIncludesAssets()) {
- AssetManager.storeAsset(elementDescription.getElementId(), elementDescription.getAppId());
+ AssetManager.storeAsset(PipelineElementUrl.DATA_SINK, elementDescription.getAppId());
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
index a87b1b2..b7d1b90 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.manager.verification;
+import org.apache.streampipes.commons.constants.PipelineElementUrl;
import org.apache.streampipes.manager.assets.AssetManager;
import org.apache.streampipes.model.SpDataStream;
@@ -72,7 +73,7 @@ public class DataStreamVerifier extends ElementVerifier<SpDataStream> {
@Override
protected void storeAssets() throws IOException {
if (elementDescription.isIncludesAssets()) {
- AssetManager.storeAsset(elementDescription.getElementId(), elementDescription.getAppId());
+ AssetManager.storeAsset(PipelineElementUrl.DATA_STREAM, elementDescription.getAppId());
}
}