You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/23 22:00:43 UTC

[incubator-streampipes] 02/02: [STREAMPIPES-373] Modify expected input stream for dashboard sinks

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 7538b9a850464df6196c1bff74f51030c4ae4409
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon May 24 00:00:14 2021 +0200

    [STREAMPIPES-373] Modify expected input stream for dashboard sinks
---
 .../execution/http/PipelineStorageService.java     | 70 +++++++++++-----------
 .../manager/health/PipelineHealthCheck.java        |  4 +-
 .../rest/impl/dashboard/VisualizablePipeline.java  | 21 ++++++-
 3 files changed, 56 insertions(+), 39 deletions(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
index d59a4b3..b80ec80 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
@@ -33,48 +33,48 @@ import java.util.stream.Collectors;
 
 public class PipelineStorageService {
 
-    private Pipeline pipeline;
+  private Pipeline pipeline;
 
-    public PipelineStorageService(Pipeline pipeline) {
-        this.pipeline = pipeline;
-    }
+  public PipelineStorageService(Pipeline pipeline) {
+    this.pipeline = pipeline;
+  }
 
-    public void updatePipeline() {
-     encryptSecrets(pipeline);
-     StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
-    }
+  public void updatePipeline() {
+    preparePipeline();
+    StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
+  }
 
-    public void addPipeline() {
-        preparePipeline();
-        StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().store(pipeline);
-    }
+  public void addPipeline() {
+    preparePipeline();
+    StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().store(pipeline);
+  }
 
-    private void preparePipeline() {
-        PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
-        InvocationGraphBuilder builder = new InvocationGraphBuilder(pipelineGraph, pipeline.getPipelineId());
-        List<InvocableStreamPipesEntity> graphs = builder.buildGraphs();
-        encryptSecrets(graphs);
+  private void preparePipeline() {
+    PipelineGraph pipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
+    InvocationGraphBuilder builder = new InvocationGraphBuilder(pipelineGraph, pipeline.getPipelineId());
+    List<InvocableStreamPipesEntity> graphs = builder.buildGraphs();
+    encryptSecrets(graphs);
 
-        List<DataSinkInvocation> secs = filter(graphs, DataSinkInvocation.class);
-        List<DataProcessorInvocation> sepas = filter(graphs, DataProcessorInvocation.class);
+    List<DataSinkInvocation> secs = filter(graphs, DataSinkInvocation.class);
+    List<DataProcessorInvocation> sepas = filter(graphs, DataProcessorInvocation.class);
 
-        pipeline.setSepas(sepas);
-        pipeline.setActions(secs);
-    }
+    pipeline.setSepas(sepas);
+    pipeline.setActions(secs);
+  }
 
-    private void encryptSecrets(List<InvocableStreamPipesEntity> graphs) {
-        SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(graphs);
-    }
+  private void encryptSecrets(List<InvocableStreamPipesEntity> graphs) {
+    SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(graphs);
+  }
 
-    private void encryptSecrets(Pipeline pipeline) {
-        SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(pipeline);
-    }
+  private void encryptSecrets(Pipeline pipeline) {
+    SecretProvider.getEncryptionService(pipeline.getCreatedByUser()).apply(pipeline);
+  }
 
-    private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> clazz) {
-        return graphs
-                .stream()
-                .filter(clazz::isInstance)
-                .map(clazz::cast)
-                .collect(Collectors.toList());
-    }
+  private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> clazz) {
+    return graphs
+            .stream()
+            .filter(clazz::isInstance)
+            .map(clazz::cast)
+            .collect(Collectors.toList());
+  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
index 0bbde50..245110e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
@@ -33,8 +33,6 @@ import java.time.format.DateTimeFormatter;
 import java.util.*;
 import java.util.stream.Collectors;
 
-import static org.apache.streampipes.manager.operations.Operations.updatePipeline;
-
 public class PipelineHealthCheck implements Runnable {
 
   private static final Logger LOG = LoggerFactory.getLogger(PipelineHealthCheck.class);
@@ -77,7 +75,7 @@ public class PipelineHealthCheck implements Runnable {
                 LOG.info("Successfully restored pipeline element {} of pipeline {}", graph.getName(), pipeline.getName());
               }
               pipeline.setPipelineNotifications(pipelineNotifications);
-              updatePipeline(pipeline);
+              StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
             }
           }
         });
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/dashboard/VisualizablePipeline.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/dashboard/VisualizablePipeline.java
index d26881d..648d454 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/dashboard/VisualizablePipeline.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/dashboard/VisualizablePipeline.java
@@ -80,7 +80,7 @@ public class VisualizablePipeline extends AbstractRestResource {
                 visualizablePipeline.setPipelineName(pipeline.getName());
                 visualizablePipeline.setVisualizationName(extractVisualizationName(sink));
                 visualizablePipeline.setSchema(sink.getInputStreams().get(0).getEventSchema());
-                visualizablePipeline.setTopic(sink.getElementId().substring(sink.getElementId().lastIndexOf(Slash) + 1));
+                visualizablePipeline.setTopic(makeTopic(sink));
 
                 visualizablePipelines.add(visualizablePipeline);
               });
@@ -89,6 +89,25 @@ public class VisualizablePipeline extends AbstractRestResource {
      return visualizablePipelines;
   }
 
+  private String makeTopic(DataSinkInvocation sink) {
+    return extractInputTopic(sink) + "-" + normalize(extractVisualizationName(sink));
+  }
+
+  private String extractInputTopic(DataSinkInvocation sink) {
+    return sink
+            .getInputStreams()
+            .get(0)
+            .getEventGrounding()
+            .getTransportProtocol()
+            .getTopicDefinition()
+            .getActualTopicName();
+  }
+
+  private String normalize(String visualizationName) {
+    return visualizationName.replaceAll(" ", "").toLowerCase();
+  }
+
+
   private String extractVisualizationName(DataSinkInvocation sink) {
     return sink.getStaticProperties()
             .stream()