You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/23 22:00:43 UTC
[incubator-streampipes] 02/02: [STREAMPIPES-373] Modify expected
input stream for dashboard sinks
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 7538b9a850464df6196c1bff74f51030c4ae4409
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon May 24 00:00:14 2021 +0200
[STREAMPIPES-373] Modify expected input stream for dashboard sinks
---
.../execution/http/PipelineStorageService.java | 70 +++++++++++-----------
.../manager/health/PipelineHealthCheck.java | 4 +-
.../rest/impl/dashboard/VisualizablePipeline.java | 21 ++++++-
3 files changed, 56 insertions(+), 39 deletions(-)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
index d59a4b3..b80ec80 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
@@ -33,48 +33,48 @@ import java.util.stream.Collectors;
public class PipelineStorageService {
- private Pipeline pipeline;
+ private Pipeline pipeline;
- public PipelineStorageService(Pipeline pipeline) {
- this.pipeline = pipeline;
- }
+ public PipelineStorageService(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ }
- public void updatePipeline() {
- encryptSecrets(pipeline);
- StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
- }
+ public void updatePipeline() {
+ preparePipeline();
+ StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
+ }
- public void addPipeline() {
- preparePipeline();
- StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().store(pipeline);
- }
+ public void addPipeline() {
+ preparePipeline();
+ StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().store(pipeline);
+ }
- private void preparePipeline() {
- PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
- InvocationGraphBuilder builder = new InvocationGraphBuilder(pipelineGraph, pipeline.getPipelineId());
- List<InvocableStreamPipesEntity> graphs = builder.buildGraphs();
- encryptSecrets(graphs);
+ private void preparePipeline() {
+ PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
+ InvocationGraphBuilder builder = new InvocationGraphBuilder(pipelineGraph, pipeline.getPipelineId());
+ List<InvocableStreamPipesEntity> graphs = builder.buildGraphs();
+ encryptSecrets(graphs);
- List<DataSinkInvocation> secs = filter(graphs, DataSinkInvocation.class);
- List<DataProcessorInvocation> sepas = filter(graphs, DataProcessorInvocation.class);
+ List<DataSinkInvocation> secs = filter(graphs, DataSinkInvocation.class);
+ List<DataProcessorInvocation> sepas = filter(graphs, DataProcessorInvocation.class);
- pipeline.setSepas(sepas);
- pipeline.setActions(secs);
- }
+ pipeline.setSepas(sepas);
+ pipeline.setActions(secs);
+ }
- private void encryptSecrets(List<InvocableStreamPipesEntity> graphs) {
- SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(graphs);
- }
+ private void encryptSecrets(List<InvocableStreamPipesEntity> graphs) {
+ SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(graphs);
+ }
- private void encryptSecrets(Pipeline pipeline) {
- SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(pipeline);
- }
+ private void encryptSecrets(Pipeline pipeline) {
+ SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(pipeline);
+ }
- private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> clazz) {
- return graphs
- .stream()
- .filter(clazz::isInstance)
- .map(clazz::cast)
- .collect(Collectors.toList());
- }
+ private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> clazz) {
+ return graphs
+ .stream()
+ .filter(clazz::isInstance)
+ .map(clazz::cast)
+ .collect(Collectors.toList());
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
index 0bbde50..245110e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
@@ -33,8 +33,6 @@ import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
-import static org.apache.streampipes.manager.operations.Operations.updatePipeline;
-
public class PipelineHealthCheck implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(PipelineHealthCheck.class);
@@ -77,7 +75,7 @@ public class PipelineHealthCheck implements Runnable {
LOG.info("Successfully restored pipeline element {} of pipeline {}", graph.getName(), pipeline.getName());
}
pipeline.setPipelineNotifications(pipelineNotifications);
- updatePipeline(pipeline);
+ StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
}
}
});
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/dashboard/VisualizablePipeline.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/dashboard/VisualizablePipeline.java
index d26881d..648d454 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/dashboard/VisualizablePipeline.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/dashboard/VisualizablePipeline.java
@@ -80,7 +80,7 @@ public class VisualizablePipeline extends AbstractRestResource {
visualizablePipeline.setPipelineName(pipeline.getName());
visualizablePipeline.setVisualizationName(extractVisualizationName(sink));
visualizablePipeline.setSchema(sink.getInputStreams().get(0).getEventSchema());
- visualizablePipeline.setTopic(sink.getElementId().substring(sink.getElementId().lastIndexOf(Slash) + 1));
+ visualizablePipeline.setTopic(makeTopic(sink));
visualizablePipelines.add(visualizablePipeline);
});
@@ -89,6 +89,25 @@ public class VisualizablePipeline extends AbstractRestResource {
return visualizablePipelines;
}
+ private String makeTopic(DataSinkInvocation sink) {
+ return extractInputTopic(sink) + "-" + normalize(extractVisualizationName(sink));
+ }
+
+ private String extractInputTopic(DataSinkInvocation sink) {
+ return sink
+ .getInputStreams()
+ .get(0)
+ .getEventGrounding()
+ .getTransportProtocol()
+ .getTopicDefinition()
+ .getActualTopicName();
+ }
+
+ private String normalize(String visualizationName) {
+ return visualizationName.replaceAll(" ", "").toLowerCase();
+ }
+
+
private String extractVisualizationName(DataSinkInvocation sink) {
return sink.getStaticProperties()
.stream()