You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/23 22:01:39 UTC

[incubator-streampipes-extensions] branch dev updated: [STREAMPIPES-373] Modify output topic of dashboard sink

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/dev by this push:
     new 981c953  [STREAMPIPES-373] Modify output topic of dashboard sink
981c953 is described below

commit 981c9536bf573b0970e156e35d488671b162249a
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon May 24 00:01:25 2021 +0200

    [STREAMPIPES-373] Modify output topic of dashboard sink
---
 .../sinks/internal/jvm/dashboard/Dashboard.java          | 16 ++++++++++++++--
 .../internal/jvm/dashboard/DashboardController.java      |  3 ++-
 .../internal/jvm/dashboard/DashboardParameters.java      | 14 +++++---------
 3 files changed, 21 insertions(+), 12 deletions(-)

diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
index 9417a6c..91564c7 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.sinks.internal.jvm.dashboard;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
+import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
@@ -36,11 +37,22 @@ public class Dashboard implements EventSink<DashboardParameters> {
     }
 
     @Override
-    public void onInvocation(DashboardParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+    public void onInvocation(DashboardParameters parameters,
+                             EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
         this.publisher = new ActiveMQPublisher(
                 SinksInternalJvmConfig.INSTANCE.getJmsHost(),
                 SinksInternalJvmConfig.INSTANCE.getJmsPort(),
-                parameters.getElementId());
+                makeTopic(parameters.getGraph().getInputStreams().get(0), parameters.getVisualizationName()));
+    }
+
+    private String makeTopic(SpDataStream inputStream, String visualizationName) {
+        return extractTopic(inputStream)
+                + "-"
+                + visualizationName.replaceAll(" ", "").toLowerCase();
+    }
+
+    private String extractTopic(SpDataStream inputStream) {
+        return inputStream.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
     }
 
     @Override
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
index d5c2a16..d30ce78 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
@@ -48,7 +48,8 @@ public class DashboardController extends StandaloneEventSinkDeclarer<DashboardPa
   @Override
   public ConfiguredEventSink<DashboardParameters> onInvocation(DataSinkInvocation invocationGraph,
                                                                DataSinkParameterExtractor extractor) {
-    return new ConfiguredEventSink<>(new DashboardParameters(invocationGraph), Dashboard::new);
+    String visualizationName = extractor.singleValueParameter(VISUALIZATION_NAME_KEY, String.class);
+    return new ConfiguredEventSink<>(new DashboardParameters(invocationGraph, visualizationName), Dashboard::new);
   }
 
 }
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
index 29d66c6..feb61b7 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
@@ -19,22 +19,18 @@
 package org.apache.streampipes.sinks.internal.jvm.dashboard;
 
 import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 
 public class DashboardParameters extends EventSinkBindingParams {
-    private String elementId;
-    private EventSchema schema;
     private String visualizationName;
 
-    public DashboardParameters(DataSinkInvocation invocationGraph) {
+    public DashboardParameters(DataSinkInvocation invocationGraph,
+                               String visualizationName) {
         super(invocationGraph);
-
-        this.elementId = invocationGraph.getElementId();
-        this.elementId = this.elementId.substring(this.elementId.lastIndexOf("/") + 1);
+        this.visualizationName = visualizationName;
     }
 
-    public String getElementId() {
-        return elementId;
+    public String getVisualizationName() {
+        return visualizationName;
     }
 }