You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/06/01 15:38:48 UTC

[incubator-streampipes-extensions] branch edge-extensions updated: Revert "[STREAMPIPES-368] Remove dependency to VisualizablePipeline storage from Dashboard sink"

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new 7e26283  Revert "[STREAMPIPES-368] Remove dependency to VisualizablePipeline storage from Dashboard sink"
7e26283 is described below

commit 7e262830f2be1b05b799f7de12c809a30f9a9316
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Fri May 28 13:36:28 2021 +0200

    Revert "[STREAMPIPES-368] Remove dependency to VisualizablePipeline storage from Dashboard sink"
    
    This reverts commit b4dd7a43
---
 .../sinks/internal/jvm/dashboard/Dashboard.java    | 42 +++++++++++++
 .../jvm/dashboard/DashboardController.java         |  4 +-
 .../internal/jvm/dashboard/DashboardModel.java     | 71 ++++++++++++++++++++++
 .../jvm/dashboard/DashboardParameters.java         | 18 +++++-
 4 files changed, 133 insertions(+), 2 deletions(-)

diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
index 9417a6c..c4dd025 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
@@ -18,10 +18,14 @@
 
 package org.apache.streampipes.sinks.internal.jvm.dashboard;
 
+import org.lightcouch.CouchDbClient;
+import org.lightcouch.CouchDbProperties;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.serializers.json.GsonSerializer;
 import org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -31,16 +35,31 @@ public class Dashboard implements EventSink<DashboardParameters> {
     private ActiveMQPublisher publisher;
     private JsonDataFormatDefinition jsonDataFormatDefinition;
 
+    private static String DB_NAME = "visualizablepipeline";
+    private static int DB_PORT = SinksInternalJvmConfig.INSTANCE.getCouchDbPort();
+    private static String DB_HOST = SinksInternalJvmConfig.INSTANCE.getCouchDbHost();
+    private static String DB_PROTOCOL = "http";
+
+
+    private String visualizationId;
+    private String visualizationRev;
+    private String pipelineId;
+
+
     public Dashboard() {
         this.jsonDataFormatDefinition = new JsonDataFormatDefinition();
     }
 
     @Override
     public void onInvocation(DashboardParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+        if (!saveToCouchDB(parameters.getGraph(), parameters)) {
+            throw new SpRuntimeException("The schema couldn't be stored in the couchDB");
+        }
         this.publisher = new ActiveMQPublisher(
                 SinksInternalJvmConfig.INSTANCE.getJmsHost(),
                 SinksInternalJvmConfig.INSTANCE.getJmsPort(),
                 parameters.getElementId());
+        this.pipelineId = parameters.getPipelineId();
     }
 
     @Override
@@ -52,8 +71,31 @@ public class Dashboard implements EventSink<DashboardParameters> {
         }
     }
 
+    private boolean saveToCouchDB(DataSinkInvocation invocationGraph, DashboardParameters params) {
+        CouchDbClient dbClient = new CouchDbClient(new CouchDbProperties(DB_NAME, true, DB_PROTOCOL, DB_HOST, DB_PORT, null, null));
+        dbClient.setGsonBuilder(GsonSerializer.getGsonBuilder());
+        org.lightcouch.Response res = dbClient.save(DashboardModel.from(params));
+
+        if (res.getError() == null) {
+            visualizationId = res.getId();
+            visualizationRev = res.getRev();
+        }
+
+        return res.getError() == null;
+    }
+
+    private boolean removeFromCouchDB() {
+        CouchDbClient dbClient = new CouchDbClient(new CouchDbProperties(DB_NAME, true, DB_PROTOCOL, DB_HOST, DB_PORT, null, null));
+        org.lightcouch.Response res = dbClient.remove(visualizationId, visualizationRev);
+
+        return res.getError() == null;
+    }
+
     @Override
     public void onDetach() throws SpRuntimeException {
         this.publisher.disconnect();
+        if (!removeFromCouchDB()) {
+            throw new SpRuntimeException("There was an error while deleting pipeline: '" + pipelineId + "'");
+        }
     }
 }
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
index d5c2a16..86670ff 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
@@ -48,7 +48,9 @@ public class DashboardController extends StandaloneEventSinkDeclarer<DashboardPa
   @Override
   public ConfiguredEventSink<DashboardParameters> onInvocation(DataSinkInvocation invocationGraph,
                                                                DataSinkParameterExtractor extractor) {
-    return new ConfiguredEventSink<>(new DashboardParameters(invocationGraph), Dashboard::new);
+    String visualizationName = extractor.singleValueParameter(VISUALIZATION_NAME_KEY,
+            String.class);
+    return new ConfiguredEventSink<>(new DashboardParameters(invocationGraph, visualizationName), Dashboard::new);
   }
 
 }
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardModel.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardModel.java
new file mode 100644
index 0000000..517d36f
--- /dev/null
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardModel.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.internal.jvm.dashboard;
+
+import org.apache.streampipes.model.schema.EventSchema;
+
+public class DashboardModel {
+
+  private String pipelineId;
+  private EventSchema schema;
+  private String visualizationName;
+  private String topic;
+
+  public static DashboardModel from(DashboardParameters params) {
+    DashboardModel model = new DashboardModel();
+    model.setPipelineId(params.getPipelineId());
+    model.setSchema(params.getSchema());
+    model.setVisualizationName(params.getVisualizationName());
+    model.setTopic(params.getElementId());
+
+    return model;
+  }
+
+  public String getPipelineId() {
+    return pipelineId;
+  }
+
+  public void setPipelineId(String pipelineId) {
+    this.pipelineId = pipelineId;
+  }
+
+  public EventSchema getSchema() {
+    return schema;
+  }
+
+  public void setSchema(EventSchema schema) {
+    this.schema = schema;
+  }
+
+  public String getVisualizationName() {
+    return visualizationName;
+  }
+
+  public void setVisualizationName(String visualizationName) {
+    this.visualizationName = visualizationName;
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+
+  public void setTopic(String topic) {
+    this.topic = topic;
+  }
+}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
index 29d66c6..8899289 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
@@ -23,17 +23,33 @@ import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 
 public class DashboardParameters extends EventSinkBindingParams {
+    private String pipelineId;
     private String elementId;
     private EventSchema schema;
     private String visualizationName;
 
-    public DashboardParameters(DataSinkInvocation invocationGraph) {
+    public DashboardParameters(DataSinkInvocation invocationGraph, String visualizationName) {
         super(invocationGraph);
+        this.schema = invocationGraph.getInputStreams().get(0).getEventSchema();
+        this.pipelineId = invocationGraph.getCorrespondingPipeline();
+        this.visualizationName = visualizationName;
 
         this.elementId = invocationGraph.getElementId();
         this.elementId = this.elementId.substring(this.elementId.lastIndexOf("/") + 1);
     }
 
+    public String getPipelineId() {
+        return pipelineId;
+    }
+
+    public EventSchema getSchema() {
+        return schema;
+    }
+
+    public String getVisualizationName() {
+        return visualizationName;
+    }
+
     public String getElementId() {
         return elementId;
     }