You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/23 22:01:39 UTC
[incubator-streampipes-extensions] branch dev updated:
[STREAMPIPES-373] Modify output topic of dashboard sink
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new 981c953 [STREAMPIPES-373] Modify output topic of dashboard sink
981c953 is described below
commit 981c9536bf573b0970e156e35d488671b162249a
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon May 24 00:01:25 2021 +0200
[STREAMPIPES-373] Modify output topic of dashboard sink
---
.../sinks/internal/jvm/dashboard/Dashboard.java | 16 ++++++++++++++--
.../internal/jvm/dashboard/DashboardController.java | 3 ++-
.../internal/jvm/dashboard/DashboardParameters.java | 14 +++++---------
3 files changed, 21 insertions(+), 12 deletions(-)
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
index 9417a6c..91564c7 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.sinks.internal.jvm.dashboard;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
+import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
@@ -36,11 +37,22 @@ public class Dashboard implements EventSink<DashboardParameters> {
}
@Override
- public void onInvocation(DashboardParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+ public void onInvocation(DashboardParameters parameters,
+ EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
this.publisher = new ActiveMQPublisher(
SinksInternalJvmConfig.INSTANCE.getJmsHost(),
SinksInternalJvmConfig.INSTANCE.getJmsPort(),
- parameters.getElementId());
+ makeTopic(parameters.getGraph().getInputStreams().get(0), parameters.getVisualizationName()));
+ }
+
+ private String makeTopic(SpDataStream inputStream, String visualizationName) {
+ return extractTopic(inputStream)
+ + "-"
+ + visualizationName.replaceAll(" ", "").toLowerCase();
+ }
+
+ private String extractTopic(SpDataStream inputStream) {
+ return inputStream.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
}
@Override
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
index d5c2a16..d30ce78 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
@@ -48,7 +48,8 @@ public class DashboardController extends StandaloneEventSinkDeclarer<DashboardPa
@Override
public ConfiguredEventSink<DashboardParameters> onInvocation(DataSinkInvocation invocationGraph,
DataSinkParameterExtractor extractor) {
- return new ConfiguredEventSink<>(new DashboardParameters(invocationGraph), Dashboard::new);
+ String visualizationName = extractor.singleValueParameter(VISUALIZATION_NAME_KEY, String.class);
+ return new ConfiguredEventSink<>(new DashboardParameters(invocationGraph, visualizationName), Dashboard::new);
}
}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
index 29d66c6..feb61b7 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
@@ -19,22 +19,18 @@
package org.apache.streampipes.sinks.internal.jvm.dashboard;
import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
public class DashboardParameters extends EventSinkBindingParams {
- private String elementId;
- private EventSchema schema;
private String visualizationName;
- public DashboardParameters(DataSinkInvocation invocationGraph) {
+ public DashboardParameters(DataSinkInvocation invocationGraph,
+ String visualizationName) {
super(invocationGraph);
-
- this.elementId = invocationGraph.getElementId();
- this.elementId = this.elementId.substring(this.elementId.lastIndexOf("/") + 1);
+ this.visualizationName = visualizationName;
}
- public String getElementId() {
- return elementId;
+ public String getVisualizationName() {
+ return visualizationName;
}
}