You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/06/01 15:38:48 UTC
[incubator-streampipes-extensions] branch edge-extensions updated:
Revert "[STREAMPIPES-368] Remove dependency to VisualizablePipeline storage
from Dashboard sink"
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new 7e26283 Revert "[STREAMPIPES-368] Remove dependency to VisualizablePipeline storage from Dashboard sink"
7e26283 is described below
commit 7e262830f2be1b05b799f7de12c809a30f9a9316
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Fri May 28 13:36:28 2021 +0200
Revert "[STREAMPIPES-368] Remove dependency to VisualizablePipeline storage from Dashboard sink"
This reverts commit b4dd7a43
---
.../sinks/internal/jvm/dashboard/Dashboard.java | 42 +++++++++++++
.../jvm/dashboard/DashboardController.java | 4 +-
.../internal/jvm/dashboard/DashboardModel.java | 71 ++++++++++++++++++++++
.../jvm/dashboard/DashboardParameters.java | 18 +++++-
4 files changed, 133 insertions(+), 2 deletions(-)
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
index 9417a6c..c4dd025 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
@@ -18,10 +18,14 @@
package org.apache.streampipes.sinks.internal.jvm.dashboard;
+import org.lightcouch.CouchDbClient;
+import org.lightcouch.CouchDbProperties;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.serializers.json.GsonSerializer;
import org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -31,16 +35,31 @@ public class Dashboard implements EventSink<DashboardParameters> {
private ActiveMQPublisher publisher;
private JsonDataFormatDefinition jsonDataFormatDefinition;
+ private static String DB_NAME = "visualizablepipeline";
+ private static int DB_PORT = SinksInternalJvmConfig.INSTANCE.getCouchDbPort();
+ private static String DB_HOST = SinksInternalJvmConfig.INSTANCE.getCouchDbHost();
+ private static String DB_PROTOCOL = "http";
+
+
+ private String visualizationId;
+ private String visualizationRev;
+ private String pipelineId;
+
+
public Dashboard() {
this.jsonDataFormatDefinition = new JsonDataFormatDefinition();
}
@Override
public void onInvocation(DashboardParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+ if (!saveToCouchDB(parameters.getGraph(), parameters)) {
+ throw new SpRuntimeException("The schema couldn't be stored in the couchDB");
+ }
this.publisher = new ActiveMQPublisher(
SinksInternalJvmConfig.INSTANCE.getJmsHost(),
SinksInternalJvmConfig.INSTANCE.getJmsPort(),
parameters.getElementId());
+ this.pipelineId = parameters.getPipelineId();
}
@Override
@@ -52,8 +71,31 @@ public class Dashboard implements EventSink<DashboardParameters> {
}
}
+ private boolean saveToCouchDB(DataSinkInvocation invocationGraph, DashboardParameters params) {
+ CouchDbClient dbClient = new CouchDbClient(new CouchDbProperties(DB_NAME, true, DB_PROTOCOL, DB_HOST, DB_PORT, null, null));
+ dbClient.setGsonBuilder(GsonSerializer.getGsonBuilder());
+ org.lightcouch.Response res = dbClient.save(DashboardModel.from(params));
+
+ if (res.getError() == null) {
+ visualizationId = res.getId();
+ visualizationRev = res.getRev();
+ }
+
+ return res.getError() == null;
+ }
+
+ private boolean removeFromCouchDB() {
+ CouchDbClient dbClient = new CouchDbClient(new CouchDbProperties(DB_NAME, true, DB_PROTOCOL, DB_HOST, DB_PORT, null, null));
+ org.lightcouch.Response res = dbClient.remove(visualizationId, visualizationRev);
+
+ return res.getError() == null;
+ }
+
@Override
public void onDetach() throws SpRuntimeException {
this.publisher.disconnect();
+ if (!removeFromCouchDB()) {
+ throw new SpRuntimeException("There was an error while deleting pipeline: '" + pipelineId + "'");
+ }
}
}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
index d5c2a16..86670ff 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
@@ -48,7 +48,9 @@ public class DashboardController extends StandaloneEventSinkDeclarer<DashboardPa
@Override
public ConfiguredEventSink<DashboardParameters> onInvocation(DataSinkInvocation invocationGraph,
DataSinkParameterExtractor extractor) {
- return new ConfiguredEventSink<>(new DashboardParameters(invocationGraph), Dashboard::new);
+ String visualizationName = extractor.singleValueParameter(VISUALIZATION_NAME_KEY,
+ String.class);
+ return new ConfiguredEventSink<>(new DashboardParameters(invocationGraph, visualizationName), Dashboard::new);
}
}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardModel.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardModel.java
new file mode 100644
index 0000000..517d36f
--- /dev/null
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardModel.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.internal.jvm.dashboard;
+
+import org.apache.streampipes.model.schema.EventSchema;
+
+public class DashboardModel {
+
+ private String pipelineId;
+ private EventSchema schema;
+ private String visualizationName;
+ private String topic;
+
+ public static DashboardModel from(DashboardParameters params) {
+ DashboardModel model = new DashboardModel();
+ model.setPipelineId(params.getPipelineId());
+ model.setSchema(params.getSchema());
+ model.setVisualizationName(params.getVisualizationName());
+ model.setTopic(params.getElementId());
+
+ return model;
+ }
+
+ public String getPipelineId() {
+ return pipelineId;
+ }
+
+ public void setPipelineId(String pipelineId) {
+ this.pipelineId = pipelineId;
+ }
+
+ public EventSchema getSchema() {
+ return schema;
+ }
+
+ public void setSchema(EventSchema schema) {
+ this.schema = schema;
+ }
+
+ public String getVisualizationName() {
+ return visualizationName;
+ }
+
+ public void setVisualizationName(String visualizationName) {
+ this.visualizationName = visualizationName;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
index 29d66c6..8899289 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
@@ -23,17 +23,33 @@ import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
public class DashboardParameters extends EventSinkBindingParams {
+ private String pipelineId;
private String elementId;
private EventSchema schema;
private String visualizationName;
- public DashboardParameters(DataSinkInvocation invocationGraph) {
+ public DashboardParameters(DataSinkInvocation invocationGraph, String visualizationName) {
super(invocationGraph);
+ this.schema = invocationGraph.getInputStreams().get(0).getEventSchema();
+ this.pipelineId = invocationGraph.getCorrespondingPipeline();
+ this.visualizationName = visualizationName;
this.elementId = invocationGraph.getElementId();
this.elementId = this.elementId.substring(this.elementId.lastIndexOf("/") + 1);
}
+ public String getPipelineId() {
+ return pipelineId;
+ }
+
+ public EventSchema getSchema() {
+ return schema;
+ }
+
+ public String getVisualizationName() {
+ return visualizationName;
+ }
+
public String getElementId() {
return elementId;
}