You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/21 10:11:29 UTC
[incubator-streampipes-extensions] branch dev updated:
[STREAMPIPES-368] Remove dependency to VisualizablePipeline storage from
Dashboard sink
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new b4dd7a4 [STREAMPIPES-368] Remove dependency to VisualizablePipeline storage from Dashboard sink
new 1977d17 Merge branch 'dev' of github.com:apache/incubator-streampipes-extensions into dev
b4dd7a4 is described below
commit b4dd7a43f1b9e10ec176e6bc09018523765949f3
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Fri May 21 12:11:07 2021 +0200
[STREAMPIPES-368] Remove dependency to VisualizablePipeline storage from Dashboard sink
---
.../sinks/internal/jvm/dashboard/Dashboard.java | 42 -------------
.../jvm/dashboard/DashboardController.java | 4 +-
.../internal/jvm/dashboard/DashboardModel.java | 71 ----------------------
.../jvm/dashboard/DashboardParameters.java | 18 +-----
4 files changed, 2 insertions(+), 133 deletions(-)
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
index c4dd025..9417a6c 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
@@ -18,14 +18,10 @@
package org.apache.streampipes.sinks.internal.jvm.dashboard;
-import org.lightcouch.CouchDbClient;
-import org.lightcouch.CouchDbProperties;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.serializers.json.GsonSerializer;
import org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -35,31 +31,16 @@ public class Dashboard implements EventSink<DashboardParameters> {
private ActiveMQPublisher publisher;
private JsonDataFormatDefinition jsonDataFormatDefinition;
- private static String DB_NAME = "visualizablepipeline";
- private static int DB_PORT = SinksInternalJvmConfig.INSTANCE.getCouchDbPort();
- private static String DB_HOST = SinksInternalJvmConfig.INSTANCE.getCouchDbHost();
- private static String DB_PROTOCOL = "http";
-
-
- private String visualizationId;
- private String visualizationRev;
- private String pipelineId;
-
-
public Dashboard() {
this.jsonDataFormatDefinition = new JsonDataFormatDefinition();
}
@Override
public void onInvocation(DashboardParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
- if (!saveToCouchDB(parameters.getGraph(), parameters)) {
- throw new SpRuntimeException("The schema couldn't be stored in the couchDB");
- }
this.publisher = new ActiveMQPublisher(
SinksInternalJvmConfig.INSTANCE.getJmsHost(),
SinksInternalJvmConfig.INSTANCE.getJmsPort(),
parameters.getElementId());
- this.pipelineId = parameters.getPipelineId();
}
@Override
@@ -71,31 +52,8 @@ public class Dashboard implements EventSink<DashboardParameters> {
}
}
- private boolean saveToCouchDB(DataSinkInvocation invocationGraph, DashboardParameters params) {
- CouchDbClient dbClient = new CouchDbClient(new CouchDbProperties(DB_NAME, true, DB_PROTOCOL, DB_HOST, DB_PORT, null, null));
- dbClient.setGsonBuilder(GsonSerializer.getGsonBuilder());
- org.lightcouch.Response res = dbClient.save(DashboardModel.from(params));
-
- if (res.getError() == null) {
- visualizationId = res.getId();
- visualizationRev = res.getRev();
- }
-
- return res.getError() == null;
- }
-
- private boolean removeFromCouchDB() {
- CouchDbClient dbClient = new CouchDbClient(new CouchDbProperties(DB_NAME, true, DB_PROTOCOL, DB_HOST, DB_PORT, null, null));
- org.lightcouch.Response res = dbClient.remove(visualizationId, visualizationRev);
-
- return res.getError() == null;
- }
-
@Override
public void onDetach() throws SpRuntimeException {
this.publisher.disconnect();
- if (!removeFromCouchDB()) {
- throw new SpRuntimeException("There was an error while deleting pipeline: '" + pipelineId + "'");
- }
}
}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
index 86670ff..d5c2a16 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
@@ -48,9 +48,7 @@ public class DashboardController extends StandaloneEventSinkDeclarer<DashboardPa
@Override
public ConfiguredEventSink<DashboardParameters> onInvocation(DataSinkInvocation invocationGraph,
DataSinkParameterExtractor extractor) {
- String visualizationName = extractor.singleValueParameter(VISUALIZATION_NAME_KEY,
- String.class);
- return new ConfiguredEventSink<>(new DashboardParameters(invocationGraph, visualizationName), Dashboard::new);
+ return new ConfiguredEventSink<>(new DashboardParameters(invocationGraph), Dashboard::new);
}
}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardModel.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardModel.java
deleted file mode 100644
index 517d36f..0000000
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardModel.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.internal.jvm.dashboard;
-
-import org.apache.streampipes.model.schema.EventSchema;
-
-public class DashboardModel {
-
- private String pipelineId;
- private EventSchema schema;
- private String visualizationName;
- private String topic;
-
- public static DashboardModel from(DashboardParameters params) {
- DashboardModel model = new DashboardModel();
- model.setPipelineId(params.getPipelineId());
- model.setSchema(params.getSchema());
- model.setVisualizationName(params.getVisualizationName());
- model.setTopic(params.getElementId());
-
- return model;
- }
-
- public String getPipelineId() {
- return pipelineId;
- }
-
- public void setPipelineId(String pipelineId) {
- this.pipelineId = pipelineId;
- }
-
- public EventSchema getSchema() {
- return schema;
- }
-
- public void setSchema(EventSchema schema) {
- this.schema = schema;
- }
-
- public String getVisualizationName() {
- return visualizationName;
- }
-
- public void setVisualizationName(String visualizationName) {
- this.visualizationName = visualizationName;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
index 8899289..29d66c6 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
@@ -23,33 +23,17 @@ import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
public class DashboardParameters extends EventSinkBindingParams {
- private String pipelineId;
private String elementId;
private EventSchema schema;
private String visualizationName;
- public DashboardParameters(DataSinkInvocation invocationGraph, String visualizationName) {
+ public DashboardParameters(DataSinkInvocation invocationGraph) {
super(invocationGraph);
- this.schema = invocationGraph.getInputStreams().get(0).getEventSchema();
- this.pipelineId = invocationGraph.getCorrespondingPipeline();
- this.visualizationName = visualizationName;
this.elementId = invocationGraph.getElementId();
this.elementId = this.elementId.substring(this.elementId.lastIndexOf("/") + 1);
}
- public String getPipelineId() {
- return pipelineId;
- }
-
- public EventSchema getSchema() {
- return schema;
- }
-
- public String getVisualizationName() {
- return visualizationName;
- }
-
public String getElementId() {
return elementId;
}