You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/10/13 15:56:39 UTC

[incubator-streampipes] branch edge-extensions updated: [hotfix] support series connection of identical pipeline elements and call node controller for configuring connect adapters

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new e715c66  [hotfix] support series connection of identical pipeline elements and call node controller for configuring connect adapters
e715c66 is described below

commit e715c6637c2e3d4dcecd6af9801b1c7f60b7ada5
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Wed Oct 13 17:56:31 2021 +0200

    [hotfix] support series connection of identical pipeline elements and call node controller for configuring connect adapters
---
 .../streampipes/connect/adapter/Adapter.java       |  9 +--
 .../connect/adapter/GroundingService.java          | 26 ++------
 .../connect/adapter/NodeControllerService.java     | 74 ++++++++++++++++++++++
 .../docker/AbstractStreamPipesDockerContainer.java |  2 +
 .../components/pipeline/pipeline.component.ts      |  7 +-
 ui/src/app/editor/services/jsplumb.service.ts      | 24 +++++--
 6 files changed, 106 insertions(+), 36 deletions(-)

diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
index 4768e02..e14e3e9 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
@@ -168,7 +168,7 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
 
         if (adapterManagedByNodeController(adapterDescription)) {
 
-            if (isEdgeOrFogNodeTarget(adapterDescription)) {
+            if (NodeControllerService.isEdgeOrFogNode(adapterDescription)) {
                 return createSendToEdgeOrFogBrokerAdapterSink(adapterDescription);
             } else {
                 return createSendToCloudBrokerAdapterSink(adapterDescription);
@@ -253,11 +253,4 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
         return desc.getDeploymentTargetNodeId() != null && !desc.getDeploymentTargetNodeId().equals("default");
     }
 
-    private static boolean isEdgeOrFogNodeTarget(AdapterDescription desc) {
-        return NodeManagement.getInstance().getAllNodes().stream()
-                .filter(n -> n.getNodeControllerId().equals(desc.getDeploymentTargetNodeId()))
-                .anyMatch(n -> n.getStaticNodeMetadata().getType().equals("edge") ||
-                        n.getStaticNodeMetadata().getType().equals("fog"));
-    }
-
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
index 3194326..e864d1f 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
@@ -93,7 +93,7 @@ public class GroundingService {
 
         if (adapterManagedByNodeController(adapterDescription)) {
 
-            if (isEdgeOrFogNodeTarget(adapterDescription)) {
+            if (NodeControllerService.isEdgeOrFogNode(adapterDescription)) {
                 createEdgeOrFogGrounding(eventGrounding, topicDefinition, adapterDescription);
             } else {
                 createCloudProtocol(eventGrounding, topicDefinition);
@@ -111,7 +111,7 @@ public class GroundingService {
 
     private static void createEdgeOrFogGrounding(EventGrounding eventGrounding, TopicDefinition topicDefinition,
                                                  AdapterDescription adapterDescription) throws AdapterException {
-        String nodeControllerId = extractNodeControllerId(adapterDescription);
+
         SpEdgeNodeProtocol edgeNodeProtocol = BackendConfig.INSTANCE
                 .getMessagingSettings()
                 .getPrioritizedEdgeProtocols()
@@ -119,14 +119,14 @@ public class GroundingService {
 
         if (isEdgeProtocol(edgeNodeProtocol, MqttTransportProtocol.class)) {
             MqttTransportProtocol brokerTransportProtocol =
-                    (MqttTransportProtocol) getNodeBrokerTransportProtocol(nodeControllerId);
+                    (MqttTransportProtocol) NodeControllerService.getNodeTransportProtocol(adapterDescription);
             brokerTransportProtocol.setTopicDefinition(topicDefinition);
 
             eventGrounding.setTransportProtocol(brokerTransportProtocol);
 
         } else if (isEdgeProtocol(edgeNodeProtocol, KafkaTransportProtocol.class)) {
             KafkaTransportProtocol brokerTransportProtocol =
-                    (KafkaTransportProtocol) getNodeBrokerTransportProtocol(nodeControllerId);
+                    (KafkaTransportProtocol) NodeControllerService.getNodeTransportProtocol(adapterDescription);
             brokerTransportProtocol.setTopicDefinition(topicDefinition);
 
             eventGrounding.setTransportProtocol(brokerTransportProtocol);
@@ -213,26 +213,8 @@ public class GroundingService {
         protocol.setTopicDefinition(topicDefinition);
     }
 
-    private static TransportProtocol getNodeBrokerTransportProtocol(String id) {
-        Optional<NodeInfoDescription> nodeInfoDescription = getNodeInfoDescriptionForId(id);
-        if (nodeInfoDescription.isPresent()) {
-            return nodeInfoDescription.get().getNodeBroker().getNodeTransportProtocol();
-        }
-        throw new SpRuntimeException("Could not find node description for id: " + id);
-    }
-
-    private static Optional<NodeInfoDescription> getNodeInfoDescriptionForId(String id){
-        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getNode(id);
-    }
-
     private static boolean adapterManagedByNodeController(AdapterDescription desc) {
         return desc.getDeploymentTargetNodeId() != null && !desc.getDeploymentTargetNodeId().equals("default");
     }
 
-    private static boolean isEdgeOrFogNodeTarget(AdapterDescription desc) {
-        return NodeManagement.getInstance().getAllNodes().stream()
-                .filter(n -> n.getNodeControllerId().equals(desc.getDeploymentTargetNodeId()))
-                .anyMatch(n -> n.getStaticNodeMetadata().getType().equals("edge") ||
-                        n.getStaticNodeMetadata().getType().equals("fog"));
-    }
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/NodeControllerService.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/NodeControllerService.java
new file mode 100644
index 0000000..644349a
--- /dev/null
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/NodeControllerService.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.connect.adapter;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class NodeControllerService {
+    private static final Logger LOG = LoggerFactory.getLogger(NodeControllerService.class);
+    
+    public static boolean isEdgeOrFogNode(AdapterDescription desc) {
+        String baseUrl = generateBaseUrl(desc);
+        NodeInfoDescription node = getNodeDescription(baseUrl);
+        String nodeType = node.getStaticNodeMetadata().getType();
+        LOG.info("Present node type: " + nodeType);
+        return nodeType.equals("edge") || nodeType.equals("fog");
+    }
+
+    public static TransportProtocol getNodeTransportProtocol(AdapterDescription desc) {
+        String baseUrl = generateBaseUrl(desc);
+        NodeInfoDescription node = getNodeDescription(baseUrl);
+        TransportProtocol transportProtocol = node.getNodeBroker().getNodeTransportProtocol();
+        LOG.info("Present node transport protocol: " + transportProtocol.toString());
+        return transportProtocol;
+    }
+
+    public static NodeInfoDescription getNodeDescription(String baseUrl) {
+        String url = baseUrl + "/api/v2/node/info";
+
+        try {
+            String payload = Request.Get(url)
+                    .connectTimeout(1000)
+                    .socketTimeout(100000)
+                    .execute()
+                    .returnContent()
+                    .toString();
+
+            return JacksonSerializer
+                    .getObjectMapper()
+                    .readValue(payload, NodeInfoDescription.class);
+
+        } catch (IOException e) {
+            LOG.info("Could not connect to " + url);
+            throw new SpRuntimeException(e);
+        }
+    }
+
+    private static String generateBaseUrl(AdapterDescription desc) {
+        return "http://" + desc.getDeploymentTargetNodeHostname() + ":" + desc.getDeploymentTargetNodePort();
+    }
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/AbstractStreamPipesDockerContainer.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/AbstractStreamPipesDockerContainer.java
index c4f63eb..ef79477 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/AbstractStreamPipesDockerContainer.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/AbstractStreamPipesDockerContainer.java
@@ -38,6 +38,8 @@ public abstract class AbstractStreamPipesDockerContainer {
 
     public List<ContainerEnvVar> generateStreamPipesNodeEnvs() {
         return new ArrayList<>(Arrays.asList(
+                toEnv(EnvConfigParam.NODE_TYPE.getEnvironmentKey(),
+                        NodeConfiguration.getNodeType()),
                 toEnv(EnvConfigParam.NODE_CONTROLLER_ID.getEnvironmentKey(),
                         NodeConfiguration.getNodeControllerId()),
                 toEnv(EnvConfigParam.NODE_CONTROLLER_CONTAINER_HOST.getEnvironmentKey(),
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.ts b/ui/src/app/editor/components/pipeline/pipeline.component.ts
index 22ad8c0..d60e5bd 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.ts
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.ts
@@ -206,7 +206,12 @@ export class PipelineComponent implements OnInit, OnDestroy {
         let pipelineElement: PipelineElementUnion = this.findPipelineElementByElementId(pipelineElementId);
         if (ui.draggable.hasClass('draggable-icon')) {
           this.EditorService.makePipelineAssemblyEmpty(false);
-          var pipelineElementConfig = this.JsplumbService.createNewPipelineElementConfig(pipelineElement, this.PipelineEditorService.getCoordinates(ui, this.currentZoomLevel), false, false);
+          let newElementId = pipelineElement.elementId + ":" + this.JsplumbService.makeId(5);
+          var pipelineElementConfig = this.JsplumbService.createNewPipelineElementConfig(pipelineElement,
+              this.PipelineEditorService.getCoordinates(ui, this.currentZoomLevel),
+              false,
+              false,
+              newElementId);
           if ((this.isStreamInPipeline() && pipelineElementConfig.type == 'set') ||
               this.isSetInPipeline() && pipelineElementConfig.type == 'stream') {
             this.showMixedStreamAlert();
diff --git a/ui/src/app/editor/services/jsplumb.service.ts b/ui/src/app/editor/services/jsplumb.service.ts
index 8bdcf2e..5114b18 100644
--- a/ui/src/app/editor/services/jsplumb.service.ts
+++ b/ui/src/app/editor/services/jsplumb.service.ts
@@ -137,13 +137,14 @@ export class JsplumbService {
     createNewPipelineElementConfig(pipelineElement: PipelineElementUnion,
                                    coordinates,
                                    isPreview: boolean,
-                                   isCompleted: boolean): PipelineElementConfig {
+                                   isCompleted: boolean,
+                                   newElementId?: string): PipelineElementConfig {
         let displaySettings = isPreview ? 'connectable-preview' : 'connectable-editor';
         let connectable = "connectable";
         let pipelineElementConfig = {} as PipelineElementConfig;
         pipelineElementConfig.type = PipelineElementTypeUtils
             .toCssShortHand(PipelineElementTypeUtils.fromType(pipelineElement))
-        pipelineElementConfig.payload = this.clone(pipelineElement);
+        pipelineElementConfig.payload = this.clone(pipelineElement, newElementId);
         pipelineElementConfig.settings = {connectable: connectable,
             openCustomize: !(pipelineElement as any).configured,
             preview: isPreview,
@@ -163,18 +164,31 @@ export class JsplumbService {
         return pipelineElementConfig;
     }
 
-    clone(pipelineElement: PipelineElementUnion) {
+    clone(pipelineElement: PipelineElementUnion, newElementId?: string) {
         if (pipelineElement instanceof SpDataSet) {
             return SpDataSet.fromData(pipelineElement, new SpDataSet());
         } else if (pipelineElement instanceof SpDataStream) {
             return SpDataStream.fromData(pipelineElement, new SpDataStream());
         } else if (pipelineElement instanceof DataProcessorInvocation) {
-            return DataProcessorInvocation.fromData(pipelineElement, new DataProcessorInvocation());
+            let clonedPe = DataProcessorInvocation.fromData(pipelineElement, new DataProcessorInvocation());
+            if (newElementId) {
+                this.updateElementIds(clonedPe, newElementId)
+            }
+            return clonedPe;
         } else {
-            return DataSinkInvocation.fromData(pipelineElement, new DataSinkInvocation());
+            let clonedPe = DataSinkInvocation.fromData(pipelineElement, new DataSinkInvocation());
+            if (newElementId) {
+                this.updateElementIds(clonedPe, newElementId);
+            }
+            return clonedPe;
         }
     }
 
+    updateElementIds(pipelineElement: PipelineElementUnion, newElementId: string) {
+        pipelineElement.elementId = newElementId;
+        pipelineElement.uri = newElementId;
+    }
+
     makeId(count: number) {
         var text = "";
         var possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";